🏗️ Архитектура корутин

┌─────────────────────────────────────────────────────────┐
│        KOTLIN COROUTINES ARCHITECTURE LAYERS             │
├─────────────────────────────────────────────────────────┤
│                                                           │
│  FOUNDATION LAYER (Основы)                              │
│  ├── suspend функции (приостановимость)                 │
│  ├── Continuation (internal компилятор)                 │
│  └── State machine (компилятор генерирует)              │
│                                                           │
│  DISPATCHERS LAYER (Где выполняется)                    │
│  ├── Main    ← UI поток (1 поток)                       │
│  ├── IO      ← I/O операции (64+ потоков)               │
│  ├── Default ← CPU задачи (CPU cores потоков)           │
│  └── Unconfined ← текущий поток (специальные случаи)    │
│                                                           │
│  BUILDERS LAYER (Запуск корутин)                        │
│  ├── launch  ← Fire-and-forget (Job)                    │
│  ├── async   ← Параллельно с результатом (Deferred<T>) │
│  └── runBlocking ← Синхронный мост (только для тестов)  │
│                                                           │
│  SCOPE LAYER (Иерархия)                                 │
│  ├── CoroutineScope ← обычная иерархия                  │
│  ├── coroutineScope{} ← временная область               │
│  └── supervisorScope{} ← изолированные дети             │
│                                                           │
│  CONTEXT LAYER (Конфигурация)                           │
│  ├── Dispatcher (где)                                   │
│  ├── Job (иерархия)                                     │
│  ├── CoroutineName (отладка)                            │
│  ├── CoroutineExceptionHandler (ошибки)                 │
│  └── Custom elements (для приложений)                   │
│                                                           │
│  FLOW LAYER (Асинхронные потоки)                        │
│  ├── Flow<T> ← холодные потоки (pull model)             │
│  ├── SharedFlow<T> ← горячие события (push model)       │
│  ├── StateFlow<T> ← состояние с кешем                   │
│  └── Channel<T> ← синхронизация между корутинами        │
│                                                           │
│  CANCELLATION LAYER (Отмена)                            │
│  ├── cancel() ← инициация отмены                        │
│  ├── ensureActive() ← проверка статуса                  │
│  └── CancellationException ← сигнал отмены              │
│                                                           │
│  INTEGRATION LAYER (Фреймворки)                         │
│  ├── Spring WebFlux ← REST endpoints                    │
│  ├── Spring Data R2DBC ← БД доступ                      │
│  ├── Room (Android) ← SQLite                            │
│  └── Ktor ← HTTP client/server                          │
│                                                           │
└─────────────────────────────────────────────────────────┘

📊 Выбор правильного инструмента

┌─────────────────────────────────────────────────────────────┐
│            DECISION TREE - ВЫБОР ИНСТРУМЕНТА               │
└─────────────────────────────────────────────────────────────┘

1. Нужен ли результат выполнения?
   │
   ├─ ✅ ДА:
   │  └─ Используй async
   │     ├─ Параллельное выполнение? → async + awaitAll()
   │     └─ Последовательно? → async + await()
   │
   └─ ❌ НЕТ:
      └─ Используй launch
         ├─ Side effects (логирование, отправка)
         └─ Fire-and-forget операции

2. Работаешь с потоком данных?
   │
   ├─ ✅ ДА - холодный поток:
   │  └─ Используй Flow<T>
   │     ├─ API вызовы → flow { emit() }
   │     ├─ БД запросы → repository.dataFlow()
   │     └─ Файловые операции → lines.asFlow()
   │
   └─ ✅ ДА - горячий поток:
      ├─ События → SharedFlow<Event>
      └─ Состояние → StateFlow<State>

3. Нужна синхронизация между корутинами?
   │
   ├─ ✅ ДА - один-к-одному:
   │  └─ Channel<T>
   │     ├─ Producer → send()
   │     └─ Consumer → receive()
   │
   └─ ✅ ДА - один-ко-многим:
      └─ SharedFlow или Channel (broadcast)

4. Нужен ли timeout?
   │
   ├─ ✅ ДА:
   │  ├─ withTimeout() → бросает TimeoutCancellationException
   │  └─ withTimeoutOrNull() → возвращает null при timeout
   │
   └─ ❌ НЕТ: обычный код

5. Какая стратегия ошибок?
   │
   ├─ ✅ "ВСЕ или НИЧЕГО" (критичные операции):
   │  └─ coroutineScope { }
   │     └─ Ошибка одного = отмена всех
   │
   └─ ✅ "ЧАСТИЧНЫЙ УСПЕХ" (независимые операции):
      └─ supervisorScope { }
         └─ Ошибка одного ≠ отмена других

6. Какой контекст?
   │
   ├─ I/O операции (БД, сеть, файлы):
   │  └─ Dispatchers.IO
   │
   ├─ CPU-интенсивные:
   │  └─ Dispatchers.Default
   │
   ├─ UI операции (Android/Desktop):
   │  └─ Dispatchers.Main
   │
   └─ Спец. случаи (тесты):
      └─ Dispatchers.Unconfined

🔄 Сравнение Launch vs Async

TIMING DIAGRAM
==============

Без параллелизма (последовательно):

Operation 1: ▓▓▓▓▓▓▓▓ 1000ms
Operation 2:         ▓▓▓▓▓▓▓▓ 1000ms
Operation 3:                 ▓▓▓▓▓▓▓▓ 1000ms
─────────────────────────────────────────────
Итого:                                 3000ms


С параллелизмом (async):

Operation 1: ▓▓▓▓▓▓▓▓ 1000ms ┐
Operation 2: ▓▓▓▓▓▓▓▓ 1000ms ├─ параллельно
Operation 3: ▓▓▓▓▓▓▓▓ 1000ms ┘
─────────────────────────────────────────────
Итого:       ▓▓▓▓▓▓▓▓  1000ms (максимум)


ОСОБЕННОСТЬ LAUNCH:

launch { }  ← НЕ ждёт завершения
  │
  └─ Возвращает сразу (Job)
       │
       └─ Код выполняется в background


ОСОБЕННОСТЬ ASYNC:

async { } ← Ждёт await()
  │
  ├─ Возвращает Deferred<T>
  │
  └─ Требует вызвать await()
       │
       └─ Блокирует вызывающую корутину
Аспект launch async
Возвращаемое значение Job Deferred
Получение результата Не предусмотрено .await()
Назначение Side effects Параллельные вычисления
Исключения Всплывают к родителю Заморожены до await()
Блокировка Не блокирует вызывающую Блокирует на await()

Практические примеры

// ❌ ПЛОХО - неправильное использование launch
suspend fun wrongUsage(): UserData {
    var userData: UserData? = null
    
    launch { // launch НЕ блокирует выполнение!
        userData = fetchUserData()
    }
    
    return userData!! // ОШИБКА! userData всегда null
}

// ✅ ХОРОШО - launch для side effects
class NotificationService {
    suspend fun processOrder(order: Order) {
        // Основная логика
        val result = validateAndProcessOrder(order)
        
        // Независимые side effects - запускаются параллельно
        launch { sendOrderConfirmation(order.email) }
        launch { updateInventory(order.items) }
        launch { logOrderEvent(order.id) }
        launch { updateAnalytics(order) }
        
        return result
    }
}

// ✅ ХОРОШО - async для параллельных операций с результатом
suspend fun loadUserDashboard(userId: String): Dashboard = coroutineScope {
    // Запускаем все запросы параллельно
    val profileAsync = async { profileService.getProfile(userId) }
    val ordersAsync = async { orderService.getOrders(userId) }
    val preferencesAsync = async { prefsService.getPreferences(userId) }
    
    // Ждём результатов (максимум времени самого долгого запроса)
    Dashboard(
        profile = profileAsync.await(),
        orders = ordersAsync.await(),
        prefs = preferencesAsync.await()
    )
}

⏱️ Timeline диаграмма Structured Concurrency

PARENT COROUTINE LIFECYCLE
==========================

Parent launch/async
    │
    ├─── Child 1 launch
    │    └─ может упасть ❌
    │
    ├─── Child 2 async
    │    └─ suspended в await()
    │
    └─── Child 3 async
         └─ в процессе выполнения

СЦЕНАРИЙ 1: Обычный Job (все связаны)
───────────────────────────────────────

Child 1 throws Exception
    │
    ├─ отменяет Child 2
    ├─ отменяет Child 3
    └─ отменяет Parent

    Результат: ВСЕ завершены с ошибкой ❌


СЦЕНАРИЙ 2: SupervisorJob (изолированы)
─────────────────────────────────────────

Child 1 throws Exception
    │
    ├─ Child 1 logs error ❌
    ├─ Child 2 continues ✓
    ├─ Child 3 continues ✓
    └─ Parent continues ✓

    Результат: Child 1 упал, остальные работают

🚦 Flow vs Reactive Streams

EXECUTION MODEL COMPARISON
===========================

Flow (Cold Stream):
   User₁ subscribes        User₂ subscribes
        │                       │
        ├─► Producer 1      ├─► Producer 2
        │   (fresh)         │   (fresh)
        └─► Emit 1,2,3      └─► Emit 1,2,3
        
   Каждый подписчик получает независимый поток

Rx Observable (Hot/Cold):
   Producer (once)
        │
        ├─► Observer₁ (может пропустить данные)
        ├─► Observer₂ (может пропустить данные)
        └─► Observer₃ (может пропустить данные)
        
   Один producerдля всех

StateFlow (Hot State):
   [Current State: 42]
        │
        ├─► Subscriber₁ (получает 42 сразу)
        ├─► Subscriber₂ (получает 42 сразу)
        └─► Subscriber₃ (получает 42 сразу)
        
   Новые подписчики получают последнее состояние

🔗 Dispatcher Selection Guide

REQUEST JOURNEY
================

┌─────────────────────────────────┐
│   Incoming Request              │
└──────────────┬──────────────────┘
               │
        ┌──────▼──────┐
        │ withContext │
        └──────┬──────┘
               │
     ┌─────────┴──────────┬──────────────┐
     │                    │              │
     │                    │              │
   IO             Default/CPU         Main/UI
─────────────────────────────────────────────
 
 • БД запросы      • Сортировка        • Обновление UI
 • HTTP calls      • Парсинг JSON      • Прогресс бары
 • Файлы           • Хеширование       • Анимации
 • Сокеты          • Шифрование        • Toast/Dialogs
 
 Pool: 64+         Pool: CPU cores     Pool: 1 (single)
 threads           threads              thread

📝 Основы корутин

Suspend функции

// Suspend функция может быть приостановлена
suspend fun fetchUserData(id: String): UserData {
    val profile = fetchProfile(id)    // suspension point
    val settings = fetchSettings(id)  // suspension point
    return UserData(profile, settings)
}

// Обычная функция НЕ может вызывать suspend напрямую
fun normalFunction() {
    // fetchUserData("123") // ОШИБКА компиляции!
}

// Suspend функции можно вызывать только из:
// 1. Других suspend функций
// 2. Корутинных bilders (launch, async)
// 3. runBlocking (только для тестов/main)

Корутинные билдеры

// launch - Fire and Forget
val job: Job = scope.launch(Dispatchers.IO) {
    doSomething() // не возвращает результат
}
job.cancel()

// async - Параллельное выполнение с результатом
val deferred: Deferred<String> = scope.async(Dispatchers.IO) {
    computeSomething() // возвращает значение
}
val result = deferred.await()

// runBlocking - Синхронный мост (ТОЛЬКО для тестов!)
fun main() = runBlocking {
    val result = suspendingFunction()
    println(result)
}

CoroutineScope - область жизни

// GlobalScope - живёт вечно (ОПАСНО! утечки памяти)
GlobalScope.launch { /* может пережить ваш объект */ }

// Кастомный scope - контролируемый жизненный цикл
class UserService : CoroutineScope {
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + job
    
    fun loadData() = launch {
        val data = fetchUserData()
        updateCache(data)
    }
    
    fun cleanup() {
        job.cancel() // отменяет все корутины
    }
}

// viewModelScope (Android) - автоматическая отмена
class UserViewModel : ViewModel() {
    fun loadData() {
        viewModelScope.launch { // автоматически отменится в onCleared()
            val data = loadFromNetwork()
            _uiState.value = UiState.Success(data)
        }
    }
}

🔄 Flow - асинхронные потоки

// Создание Flow
fun numbersFlow(): Flow<Int> = flow {
    for (i in 1..5) {
        delay(1000)
        emit(i)
    }
}

// Трансформация данных
fun processNumbers() = numbersFlow()
    .filter { it % 2 == 0 }
    .map { "Number: $it" }
    .flowOn(Dispatchers.Default)
    .onEach { log.info(it) }
    .catch { exception -> emit("Error: ${exception.message}") }

// Потребление Flow
suspend fun consumeFlow() {
    processNumbers().collect { value ->
        println(value)
    }
}

// StateFlow для управления состоянием
class UserViewModel {
    private val _state = MutableStateFlow<UiState>(UiState.Loading)
    val state: StateFlow<UiState> = _state.asStateFlow()
    
    fun loadUser(id: String) {
        viewModelScope.launch {
            try {
                _state.value = UiState.Loading
                val user = userService.getUser(id)
                _state.value = UiState.Success(user)
            } catch (e: Exception) {
                _state.value = UiState.Error(e.message)
            }
        }
    }
}

🛡️ Отмена и обработка ошибок

// Отмена корутины
val job = launch { /* code */ }
job.cancel()              // отменить
job.cancelAndJoin()       // отменить и дождаться

// Проверка статуса отмены
suspend fun longRunningTask() {
    repeat(1000) { i ->
        ensureActive()  // throws CancellationException если отменена
        if (!isActive) { // мягкая проверка
            cleanup()
            return
        }
        doWork(i)
        delay(10)
    }
}

// Обработка исключений
val exceptionHandler = CoroutineExceptionHandler { _, exception ->
    log.error("Coroutine failed", exception)
}

val scope = CoroutineScope(Dispatchers.IO + SupervisorJob() + exceptionHandler)

// Try-catch в корутинах
suspend fun safeOperation() {
    try {
        riskyOperation()
    } catch (e: CancellationException) {
        cleanup()
        throw e  // ВСЕГДА перебрасываем
    } catch (e: Exception) {
        handleError(e)
    }
}

📌 Ключевые моменты

  1. Suspend функции — не блокируют потоки, приостанавливают выполнение
  2. launch — fire-and-forget для side effects
  3. async — параллельное выполнение с результатом через await()
  4. Dispatchers выбираются по типу операции (IO/Default/Main)
  5. Structured Concurrency автоматизирует управление жизненным циклом
  6. Flow — холодные потоки для трансформации данных
  7. StateFlow — горячее состояние с кешем последнего значения
  8. CancellationException всегда перебрасывать, не логировать как ошибку
  9. GlobalScope — антипаттерн, вызывает утечки памяти
  10. SupervisorJob — для изоляции ошибок независимых корутин

Основы корутин

Что такое корутина?

Корутина (Coroutine) — это легковесная единица асинхронного выполнения, которая может быть приостановлена (suspended) и возобновлена позже без блокировки потока. Корутины позволяют писать асинхронный код в последовательном стиле.

Ключевые концепции:

  • Suspension — возможность приостановить выполнение функции
  • Non-blocking — не блокируют поток во время ожидания
  • Structured concurrency — корутины организованы в иерархию с автоматической отменой
  • Cooperative multitasking — корутины добровольно отдают управление
// Простая корутина
suspend fun fetchUser(id: String): User {
    delay(1000) // приостанавливает корутину, НЕ блокирует поток
    return userRepository.findById(id)
}

📊 Отличия корутин от потоков

Потокоёмкость: Threads vs Coroutines

THREAD POOL MODEL (Traditional Java)
====================================

Thread Pool (10 потоков)
│
├─ Thread 1 [BlockedI/O: fetchDB()]
│  └─ Stack: ~1-2 MB
│
├─ Thread 2 [BlockedI/O: callAPI()]
│  └─ Stack: ~1-2 MB
│
├─ Thread 3 [BlockedI/O: readFile()]
│  └─ Stack: ~1-2 MB
│
├─ Thread 4-8 [Blocked/Idle...]
│  └─ Wasting ~8-10 MB!
│
└─ Thread 9-10 [Available]
   
Resources WASTED on blocking!
Can handle ~1000 concurrent requests


COROUTINE MODEL (Kotlin)
=========================

Dispatcher (2 active threads)
│
├─ Thread 1 ─────────────────────────────
│   ├─ Coroutine 1 [I/O]  ──┐
│   ├─ Coroutine 2 [CPU]  ──┤ All lightweight!
│   ├─ Coroutine 3 [I/O]  ──┤ ~100 bytes each
│   └─ ... 10,000 more   ──┘
│
└─ Thread 2 ─────────────────────────────
    ├─ Coroutine 10001 [I/O]
    ├─ Coroutine 10002 [CPU]
    └─ ...

Resources EFFICIENT!
Can handle ~1 MILLION concurrent requests

Таблица сравнения: Thread vs Coroutine

Аспект Thread Coroutine
Вес ~1-8 MB стека ~100 байт объекта
Создание Дорогое (OS scheduling) Дешёвое (~microseconds)
Переключение Context switch (~microseconds) Virtual (~nanoseconds)
Масштабируемость ~1000 потоков max ~1 000 000 корутин
Блокировка Блокирует весь поток Приостанавливает корутину
Memory footprint 10,000 threads = 10-80GB 10,000 корутин = ~1MB
Отмена Unsafe (Thread.stop deprecated) Safe (cooperative)
Синхронизация Mutex, Semaphore, Lock Suspend, Channel

🔄 Практический пример: 1000 HTTP запросов

❌ Плохо: Thread Pool подход

// Используем Executor с 100 потоками
private val executor = Executors.newFixedThreadPool(100)

fun fetchUsersThreadApproach(userIds: List<String>) {
    val futures = userIds.map { userId ->
        executor.submit {
            val user = blockingHttpCall("/users/$userId") // БЛОКИРУЕТ ПОТОК!
            println("Loaded: $user")
        }
    }
    
    futures.forEach { it.get() } // ждём всех
}

// Проблемы:
// 1. Нужно 100 потоков для 1000 запросов
// 2. Если поток блокируется, деградация производительности
// 3. Переключение контекста = дорого
// 4. ~800 MB памяти вплиялась!

✅ Хорошо: Coroutine подход

suspend fun fetchUsersCoroutineApproach(userIds: List<String>) = withContext(Dispatchers.IO) {
    userIds.map { userId ->
        async { // только запуск, не блокирует!
            val user = apiClient.getUser(userId) // SUSPEND, не блокирует!
            println("Loaded: $user")
            user
        }
    }.awaitAll() // ждём всех результатов
}

// Преимущества:
// 1. Нужно 2 потока для 1000 запросов!
// 2. Приостановка = не теряется CPU
// 3. Переключение контекста между корутинами = дешёво
// 4. ~1 MB памяти!

📈 Batch обработка: когда нужны корутины

10 пользователей, 1 запрос на пользователя (100ms каждый)

SEQUENTIAL (обычный подход):
User 1  ▓▓▓▓▓▓▓▓ 100ms
User 2          ▓▓▓▓▓▓▓▓ 100ms
User 3                  ▓▓▓▓▓▓▓▓ 100ms
...
User 10                              ▓▓▓▓▓▓▓▓ 100ms
─────────────────────────────────────────────────── 1000ms TOTAL!


THREAD POOL (10 потоков):
User 1  ▓▓▓▓▓▓▓▓
User 2  ▓▓▓▓▓▓▓▓
User 3  ▓▓▓▓▓▓▓▓
...
User 10 ▓▓▓▓▓▓▓▓
─────────────────── ~100ms TOTAL (но использует 10 потоков!)


COROUTINES (2 потока):
User 1,2,3... (все параллельно)
Dispatchers.IO выполняет async/await
─────────────────── ~100ms TOTAL (использует 2 потока!)

🎯 Structured Concurrency - ключевая концепция

Problem without Structured Concurrency:

// ❌ ПЛОХО: запущенные корутины могут "утечь"
class OldWayService {
    fun loadDataInBackground(userId: String) {
        GlobalScope.launch { // ← НИКОГДА НЕ ОТМЕНЯЕТСЯ
            val user = fetchUser(userId)
            updateCache(user)
        }
    }
    
    // Service удаляется, но корутина продолжает работать!
}

Solution with Structured Concurrency:

// ✅ ХОРОШО: корутины контролируются через scope
class GoodWayService : CoroutineScope {
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + job
    
    fun loadDataInBackground(userId: String) {
        launch { // привязано к scope
            val user = fetchUser(userId)
            updateCache(user)
        }
    }
    
    fun cleanup() {
        job.cancel() // отменяет ВСЕ корутины
    }
}

// Service удаляется → coroutineContext отменяется → все корутины останавливаются

Диаграмма Structured Concurrency:

Parent Scope (User service)
    │
    ├─ Coroutine 1: fetchUser ✓
    ├─ Coroutine 2: updateCache ✓
    └─ Coroutine 3: syncDB ✓
    
Service destroyed
    │
    └─ ALL coroutines automatically cancelled ✓

🏗️ Executor vs CoroutineScope: Comparison

// EXECUTOR APPROACH (old way)
class OldExecutorService {
    private val executor = Executors.newFixedThreadPool(10)
    
    fun processItem(item: Item) {
        executor.submit {
            try {
                val result = heavyComputation(item)
                handleResult(result)
            } catch (e: Exception) {
                log.error("Error", e)
            }
        }
    }
    
    // No graceful way to cancel all pending tasks
    fun shutdown() {
        executor.shutdown() // crude shutdown
    }
}

// COROUTINE APPROACH (modern way)
class ModernCoroutineService : CoroutineScope {
    override val coroutineContext = Dispatchers.Default + SupervisorJob()
    
    fun processItem(item: Item) {
        launch {
            try {
                val result = heavyComputation(item)
                handleResult(result)
            } catch (e: Exception) {
                log.error("Error", e)
            }
        }
    }
    
    // Graceful cancellation with cleanup
    suspend fun shutdown() {
        coroutineContext.job?.cancelAndJoin()
    }
}

⚠️ Common Pitfall: ThreadLocal + Coroutines

// ThreadLocal работает ПО ПОТОКАМ, а корутины выполняются в разных потоках!

val requestId = ThreadLocal<String>()

suspend fun badApproach() {
    requestId.set("req-123") // запустили в Main потоке
    
    withContext(Dispatchers.IO) { // ПЕРЕКЛЮЧАЕМСЯ В ДРУГОЙ ПОТОК!
        println(requestId.get()) // NULL! Данные потеряны!
    }
}

// ✅ РЕШЕНИЕ: Используйте CoroutineContext элементы вместо ThreadLocal

class RequestId(val id: String) : AbstractCoroutineContextElement(RequestId) {
    companion object Key : CoroutineContext.Key<RequestId>
}

suspend fun goodApproach() {
    val requestId = RequestId("req-123")
    
    withContext(Dispatchers.IO + requestId) { // передаём через контекст
        val id = coroutineContext[RequestId]?.id // "req-123" ✓
        println(id)
    }
}

📋 Чеклист: Готовность к корутинам


🎓 Ключевые выводы

  1. Корутины ~ 1,000x легче потоков - можно создавать миллионы
  2. Structured Concurrency - автоматическое управление жизненным циклом
  3. Non-blocking - приостановка без потери CPU
  4. Cooperative - корутины сами решают когда остановиться
  5. Context-aware - используют CoroutineContext вместо ThreadLocal
  6. Composable - можно комбинировать с Flow и другими структурами

Ключевые строительные блоки

Suspend функции

Suspend функция — это функция, которая может быть приостановлена и возобновлена позже без блокировки потока. Ключевое слово suspend указывает компилятору, что функция может приостанавливать выполнение в suspension points.

Базовые принципы

// suspend функция может вызывать другие suspend функции
suspend fun fetchUserData(id: String): UserData {
    val profile = fetchProfile(id)    // suspension point
    val settings = fetchSettings(id)  // suspension point
    return UserData(profile, settings)
}

// Обычная функция НЕ может вызывать suspend функции напрямую
fun normalFunction() {
    // fetchUserData("123") // ОШИБКА компиляции!
}

// Suspend функции можно вызывать только из:
// 1. Других suspend функций
// 2. Корутинных builders (launch, async)
// 3. runBlocking (только для тестов/main)

Что происходит под капотом:

  • Компилятор преобразует suspend функцию в state machine
  • В каждом suspension point функция может сохранить состояние и освободить поток
  • При возобновлении состояние восстанавливается и выполнение продолжается

📊 Suspension Points - визуально

EXECUTION FLOW
==============

suspend fun processUser(userId: String): Result {
    // STATE 0: Starting
    val profile = fetchProfile(userId)    ◄── SUSPENSION POINT 1
                                          (может приостановиться)
    
    // STATE 1: Profile loaded, resume here
    val settings = fetchSettings(userId)  ◄── SUSPENSION POINT 2
                                          (может приостановиться)
    
    // STATE 2: Settings loaded, resume here
    return process(profile, settings)
}


ЧТО ВИДИТ КОМПИЛЯТОР:
═════════════════════

suspend fun processUser(...): Continuation<Result>
    |
    ├─ switch(state) {
    │   case 0 → fetchProfile() (возвращает suspend)
    │   case 1 → fetchSettings() (возвращает suspend)
    │   case 2 → return process()
    │ }
    |
    └─ При suspend → сохраняем state и отпускаем поток
      При resume → восстанавливаем state и продолжаем

🔄 Coroutine Builders: Запуск корутин

launch - Fire and Forget

// launch НЕ возвращает результат, нужен только для side effects
val job: Job = scope.launch(Dispatchers.IO) {
    val userData = fetchUserData()
    updateDatabase(userData)  // side effect
    sendNotification(userData) // side effect
}

// Отмена
job.cancel()
job.join()  // ждём завершения
job.cancelAndJoin()  // отмена + ждём

// ⏱️ TIMING DIAGRAM
┌─────────────────────────────────┐
│ launch { ... } returns Job ─────→ code continues immediately!
│ ├─ Task executing in background │
│ ├─ Main code doesn't wait       │
│ └─ Fire and forget              │
└─────────────────────────────────┘

Когда использовать launch:

  • Side effects (логирование, кеширование, отправка событий)
  • Fire-and-forget операции
  • Параллельные фоновые задачи

async - Параллельное выполнение с результатом

// async возвращает Deferred<T>, нужно вызвать await()
val deferred: Deferred<String> = scope.async(Dispatchers.IO) {
    computeExpensiveValue()  // returns String
}

// Ждём результата
val result = deferred.await()

// ⏱️ TIMING DIAGRAM
┌──────────────────────────────────────┐
│ async { ... } returns Deferred ────→ code continues
│ ├─ Task executing in parallel       │
│ ├─ Later: result = deferred.await() │
│ │          ← BLOCKS until ready      │
│ └─ Get result when ready            │
└──────────────────────────────────────┘

Когда использовать async:

  • Нужен результат выполнения
  • Параллельные операции с await()
  • Комбинирование нескольких async результатов

❌ Частая ошибка: launch вместо async

// ❌ ПЛОХО: забываешь результат
suspend fun wrongApproach(): String {
    var result: String? = null
    
    launch { // launch НЕ ждёт!
        result = expensiveComputation()
    }
    
    return result ?: "default" // ВСЕГДА null! launch не ждёт завершения
}

// ✅ ХОРОШО: используй async + await
suspend fun correctApproach(): String = coroutineScope {
    val deferred = async { expensiveComputation() }
    return@coroutineScope deferred.await()
}

// ✅ ИЛИ: используй launch + лучшую структуру
suspend fun anotherGoodApproach(): String = coroutineScope {
    val result = async { expensiveComputation() }
    return@coroutineScope result.await()
}

runBlocking - Синхронный мост (ТОЛЬКО для тестов!)

// runBlocking БЛОКИРУЕТ текущий поток! НИКОГДА в production!
fun main() = runBlocking { // ← БЛОКИРУЕТ основной поток
    val user = fetchUserData() // suspend вызов
    println(user)
}

// ⚠️ TIMING DIAGRAM - runBlocking опасен!
Main thread:
┌─────────────────────────────────┐
│ runBlocking { ... }             │
│ ├─ Coroutine 1 executing        │
│ ├─ THREAD BLOCKED!              │ ← Весь поток замёрзнет!
│ └─ Processing...                │
└─────────────────────────────────┘
 0ms       1000ms      2000ms      3000ms

// ❌ НИКОГДА делайте так в production:
@RestController
class Controller {
    @GetMapping("/users/{id}")
    fun getUser(@PathVariable id: String): User {
        return runBlocking { // ← ВЕСЬ THREAD POOL ЗАМЁРЗНЕТ!
            userService.fetchUser(id)
        }
    }
}

📊 Dispatch Selection Guide

DISPATCHER CHARACTERISTICS
==========================

┌─────────────────────────────────────────────────────┐
│ Dispatchers.Main                                    │
├─────────────────────────────────────────────────────┤
│ • Назначение: UI операции (Android/Desktop)         │
│ • Pool size: 1 (single thread!)                     │
│ • Использование: updateUI(), setText()              │
│ • Производительность: MUST BE FAST!                 │
│ • Timeout: <16ms (для 60fps)                        │
│ • Пример:                                           │
│   withContext(Dispatchers.Main) {                   │
│       binding.userText.text = user.name             │
│   }                                                 │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│ Dispatchers.IO                                      │
├─────────────────────────────────────────────────────┤
│ • Назначение: I/O операции (сеть, файлы, БД)       │
│ • Pool size: 64+ потоков (expandable!)              │
│ • Использование: API calls, DB queries              │
│ • Производительность: OPTIMIZED FOR BLOCKING       │
│ • Timeout: Few seconds OK                          │
│ • Пример:                                           │
│   withContext(Dispatchers.IO) {                     │
│       userRepository.findById(id)                   │
│   }                                                 │
└─────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────┐
│ Dispatchers.Default                                 │
├─────────────────────────────────────────────────────┤
│ • Назначение: CPU-intensive операции                │
│ • Pool size: CPU cores (fixed!)                     │
│ • Использование: Sorting, Parsing, Encryption      │
│ • Производительность: OPTIMIZED FOR CPU            │
│ • Timeout: Variable (depends on algorithm)          │
│ • Пример:                                           │
│   withContext(Dispatchers.Default) {                │
│       expensiveJsonParsing(jsonString)              │
│   }                                                 │
└─────────────────────────────────────────────────────┘

DISPATCHER SELECTION DECISION TREE
═══════════════════════════════════

Что делаешь?
│
├─ I/O операция (БД, HTTP, файлы)?
│  └─ Dispatchers.IO
│
├─ CPU операция (сортировка, парсинг, шифрование)?
│  └─ Dispatchers.Default
│
├─ UI операция (обновление экрана)?
│  └─ Dispatchers.Main
│
└─ Уникальный случай?
   └─ Dispatchers.Unconfined (very rare!)

💡 Практический пример: Параллельная загрузка

// ⏱️ TIMING DIAGRAM
Sequential (неправильно):
┌─────────────────────────────────────┐
│ fetchUser()  ▓▓▓ 1000ms             │
│ fetchOrders()    ▓▓▓ 1000ms         │
│ fetchSettings()      ▓▓▓ 1000ms     │
│ Total: 3000ms                       │
└─────────────────────────────────────┘

Parallel (правильно):
┌─────────────────┐
│ fetchUser() ▓▓▓ │ 1000ms
│ fetchOrders() ▓▓ │ 1000ms (parallel!)
│ fetchSettings() ▓ │ 1000ms (parallel!)
│ Total: 1000ms   │
└─────────────────┘


ВОТ КОД:

// ❌ ПЛОХО: последовательно
suspend fun slowDashboard(userId: String): Dashboard {
    val user = userService.getUser(userId)     // 1000ms
    val orders = orderService.getOrders(userId) // 1000ms
    val prefs = prefService.getPrefs(userId)    // 1000ms
    
    return Dashboard(user, orders, prefs) // Итого: 3000ms ❌
}

// ✅ ХОРОШО: параллельно
suspend fun fastDashboard(userId: String): Dashboard = coroutineScope {
    val userAsync = async { userService.getUser(userId) }     // 1000ms
    val ordersAsync = async { orderService.getOrders(userId) } // 1000ms
    val prefsAsync = async { prefService.getPrefs(userId) }    // 1000ms
    
    // awaitAll ждёт всех результатов
    return@coroutineScope Dashboard(
        user = userAsync.await(),
        orders = ordersAsync.await(),
        prefs = prefsAsync.await()
    ) // Итого: 1000ms ✅
}

withContext() - Переключение контекста

// Переключаемся между dispatchers без создания новой корутины

suspend fun complexOperation() = withContext(Dispatchers.IO) {
    val data = loadFromNetwork() // в Dispatchers.IO потоке
    
    withContext(Dispatchers.Default) {
        val processed = expensiveProcessing(data) // в Default потоке
    }
    
    withContext(Dispatchers.Main) {
        displayResult(processed) // в Main потоке
    }
}

// ⏱️ TIMING: All in SAME coroutine, different threads
Main thread  ────┐
               │
IO threads    ──│── loadFromNetwork
Default threads ├── expensiveProcessing
Main thread  ──┘
               └─ displayResult

🛡️ CoroutineScope - управление жизненным циклом

// ❌ ПЛОХО: GlobalScope - утечка памяти
class UserService {
    fun loadData(userId: String) {
        GlobalScope.launch { // НИКОГДА не отменяется!
            val data = fetchUserData(userId)
            updateCache(data)
        }
    }
}

// ✅ ХОРОШО: управляемый CoroutineScope
class UserService : CoroutineScope {
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + job
    
    fun loadData(userId: String) {
        launch { // привязано к scope
            val data = fetchUserData(userId)
            updateCache(data)
        }
    }
    
    fun cleanup() {
        job.cancel() // отменяет ВСЕ корутины
    }
}

// Android пример (ViewModel)
class UserViewModel : ViewModel() {
    fun loadData(userId: String) {
        viewModelScope.launch { // автоматически отменяется в onCleared()
            val user = userService.getUser(userId)
            _state.value = UiState.Success(user)
        }
    }
    
    // viewModelScope.cancel() вызывается автоматически!
}

📋 Чеклист: Правильное использование builders


🎓 Ключевые выводы

  1. launch = fire-and-forget для side effects
  2. async = параллельное выполнение с результатом (await())
  3. Dispatcher = выбираем по типу операции (IO/Default/Main)
  4. runBlocking = ТОЛЬКО для тестов и main()!
  5. CoroutineScope = контролируем жизненный цикл
  6. GlobalScope = антипаттерн, вызывает утечки
  7. withContext() = переключение контекста без новой корутины

Structured Concurrency

📊 Иерархия корутин - диаграмма

STRUCTURED CONCURRENCY HIERARCHY
=================================

Parent Scope (root)
    ├─ Context: Dispatchers.IO + SupervisorJob()
    │
    ├─ Child Job 1 ✓ Success
    │   └─ завершена нормально
    │
    ├─ Child Job 2 ✗ Exception
    │   └─ упала с ошибкой
    │
    └─ Child Job 3 → Cancelled
        └─ отменена при ошибке Job 2


ОБЫЧНЫЙ JOB (ERROR PROPAGATION)
────────────────────────────────
                    Parent Job
                        ▲
                        │ (CancellationException)
                        │
        ┌───────────────┼───────────────┐
        │               │               │
    Child 1 ✓       Child 2 ✗       Child 3 ❌
    Success      throws error    cancelled!
    
    Результат: ОДИН УПАЛ - ВСЕ УПАЛИ!


SUPERVISORJOB (ERROR ISOLATION)
────────────────────────────────
                  Parent Supervisor
                         │
        ┌────────────────┼────────────────┐
        │                │                │
    Child 1 ✓        Child 2 ✗        Child 3 ✓
    Success      error isolated      continues!
    
    Результат: ОДИН УПАЛ - ОСТАЛЬНЫЕ РАБОТАЮТ!

🎯 Когда использовать coroutineScope vs supervisorScope

DECISION TREE
=============

Нужно гарантировать успех ВСЕ или НИЧЕГО?
│
├─ ✅ ДА (критичные данные, транзакция, платёж)
│   └─ coroutineScope {
│        val result1 = async { criticalOp1() }  // если упадёт
│        val result2 = async { criticalOp2() }  // отменяет result2
│        val result3 = async { criticalOp3() }  // отменяет result3
│       }
│
└─ ❌ НЕТ (независимые операции, dashboard, сбор данных)
    └─ supervisorScope {
         val profile = async { loadProfile() }  // если упадёт
         val orders = async { loadOrders() }    // продолжит работу
         val prefs = async { loadPrefs() }      // продолжит работу
        }


ПРАКТИЧЕСКИЕ ПРИМЕРЫ
====================

Финансовая транзакция (coroutineScope - "ВСЕ или НИЧЕГО"):
────────────────────────────────────────────────────────
suspend fun processPayment(payment: Payment) = coroutineScope {
    val validation = async { validatePayment(payment) }
    val authorization = async { authorizeCard(payment) }
    val processing = async { processTransaction(payment) }
    
    // Если любой шаг упадёт - вся транзакция отменяется
    val results = awaitAll(validation, authorization, processing)
    return@coroutineScope PaymentResult(results)
}


Загрузка Dashboard (supervisorScope - "ЧАСТИЧНЫЙ УСПЕХ"):
──────────────────────────────────────────────────────────
suspend fun loadDashboard(userId: String) = supervisorScope {
    val weather = async {
        try { weatherService.get() }
        catch (e: Exception) { null } // fallback
    }
    val news = async {
        try { newsService.get() }
        catch (e: Exception) { emptyList() }
    }
    val stocks = async {
        try { stockService.get(userId) }
        catch (e: Exception) { emptyList() }
    }
    
    // Показываем то что удалось загрузить
    return@supervisorScope Dashboard(
        weather = weather.await(),
        news = news.await(),
        stocks = stocks.await()
    )
}


Микросервисный шлюз (supervisorScope - независимые сервисы):
─────────────────────────────────────────────────────────────
suspend fun fetchDashboardData(userId: String) = supervisorScope {
    val user = async {
        try { userService.getUser(userId) }
        catch (e: Exception) { null }
    }
    val orders = async {
        try { orderService.getOrders(userId) }
        catch (e: Exception) { emptyList() }
    }
    val notifications = async {
        try { notificationService.get(userId) }
        catch (e: Exception) { emptyList() }
    }
    
    // Один упавший сервис не ломает весь dashboard
    DashboardData(
        user = user.await(),
        orders = orders.await(),
        notifications = notifications.await()
    )
}

🔄 Сравнение Job типов

Аспект Job SupervisorJob
Ошибка дочерней Отменяет всех siblings Изолирует от siblings
Родитель ждёт Да, всех завершения Да, всех завершения
Отмена родителя Отменяет всех детей Отменяет всех детей
Use case Критичные операции Независимые операции
Отмена одного child Отменяет остальных НЕ влияет на остальных

💡 Комбинированный подход

// Часто используется комбинация coroutineScope + supervisorScope

class OrderProcessingService : CoroutineScope {
    private val supervisorJob = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + supervisorJob
    
    suspend fun processOrder(order: Order): OrderResult = supervisorScope {
        // КРИТИЧНАЯ часть - strict иерархия
        val coreResult = async {
            coroutineScope { // ВСЕ или НИЧЕГО
                val validation = async { validateOrder(order) }
                val inventory = async { reserveInventory(order) }
                val payment = async { processPayment(order) }
                
                CoreOrderData(
                    validation.await(),
                    inventory.await(),
                    payment.await()
                )
            }
        }
        
        // ДОПОЛНИТЕЛЬНЫЕ сервисы - graceful degradation
        val notifications = async {
            try { sendNotification(order) }
            catch (e: Exception) { false }
        }
        val analytics = async {
            try { trackOrderEvent(order) }
            catch (e: Exception) { false }
        }
        
        OrderResult(
            core = coreResult.await(),
            notified = notifications.await(),
            tracked = analytics.await()
        )
    }
}

🛡️ Обработка ошибок в структурированной конкуренции

// Паттерн: Exception Handler + SupervisorJob + Try-Catch

val scope = CoroutineScope(
    Dispatchers.IO +
    SupervisorJob() +
    CoroutineExceptionHandler { _, exception ->
        log.error("Root exception handler", exception)
        metricsService.recordException(exception)
    }
)

scope.launch {
    supervisorScope {
        val task1 = async {
            try { riskyOp1() }
            catch (e: SpecificException) {
                log.warn("Task 1 failed", e)
                null // fallback
            }
        }
        
        val task2 = async {
            try { riskyOp2() }
            catch (e: SpecificException) {
                log.warn("Task 2 failed", e)
                null // fallback
            }
        }
        
        val task3 = async {
            try { riskyOp3() }
            catch (e: Exception) {
                log.error("Task 3 critical failure", e)
                throw e // propagate critical errors
            }
        }
        
        try {
            val results = awaitAll(task1, task2, task3)
            processResults(results)
        } catch (e: CancellationException) {
            cleanup()
            throw e // ALWAYS rethrow
        }
    }
}

📋 Чеклист Structured Concurrency

  • Используете ли вы GlobalScope? → ЗАМЕНИТЕ на CoroutineScope
  • Отменяются ли корутины при завершении scope? → Убедитесь в
  • Обработаны ли исключения? → Добавьте
  • Правильный Job тип? → Job для "всё или ничего", SupervisorJob
  • Проверяется ли isActive
  • Закрываются ли ресурсы? → Используйте try-finally

🚀 Практические паттерны

Паттерн 1: Graceful Shutdown

class ServiceManager {
    private val scope = CoroutineScope(
        Dispatchers.IO + SupervisorJob() +
        CoroutineName("ServiceManager")
    )
    
    fun startServices() {
        scope.launch {
            launch { backgroundCleanup() }
            launch { reportingTask() }
            launch { healthCheckTask() }
        }
    }
    
    suspend fun shutdown() {
        scope.coroutineContext[Job]?.let { job ->
            log.info("Shutting down ServiceManager...")
            job.cancel()
            try {
                withTimeout(30_000) {
                    job.join() // wait for graceful shutdown
                }
            } catch (e: TimeoutCancellationException) {
                log.warn("Shutdown timeout, forcing termination")
            }
        }
    }
}

Паттерн 2: Batch Processing

suspend fun processBatchIndependently(items: List<Item>) = supervisorScope {
    val results = items.map { item ->
        async {
            try {
                processItem(item)
            } catch (e: Exception) {
                log.error("Failed to process item ${item.id}", e)
                ProcessingResult.Failed(item.id, e)
            }
        }
    }.awaitAll()
    
    val successful = results.filterIsInstance<ProcessingResult.Success>()
    val failed = results.filterIsInstance<ProcessingResult.Failed>()
    
    log.info("Batch processing: ${successful.size} success, ${failed.size} failed")
    return@supervisorScope BatchResult(successful, failed)
}

Отмена и обработка исключений

Механизм отмены корутин

Отмена корутин — это кооперативный процесс, где корутина добровольно проверяет свой статус и прекращает работу. В отличие от Thread.interrupt(), отмена корутин безопасна и предсказуема.

Ключевые принципы отмены

  • Кооперативность — корутина сама решает, когда остановиться
  • Безопасность — отмена не прерывает операцию посередине
  • Структурированность — отмена распространяется по иерархии
  • Исключение как сигнал — CancellationException указывает на отмену

📊 Распространение исключений: launch vs async

LAUNCH - исключение сразу всплывает вверх
═══════════════════════════════════════════

scope.launch {
    try {
        child1Launch() // ← throws Exception
    } catch (e: Exception) {
        // Обработано
    }
}

РЕЗУЛЬТАТ: Исключение всплывает СРАЗУ!
        │
        ├─ CoroutineExceptionHandler (если есть)
        │
        └─ Parent Job cancelled ❌


ASYNC - исключение заморожено до await()
═════════════════════════════════════════

val deferred = scope.async {
    try {
        child1Async() // ← throws Exception
    } catch (e: Exception) {
        // Обработано
    }
}

// Исключение ЗАМОРОЖЕНО в Deferred
val result = deferred.await() // ← Exception выбросится ТУТ!

РЕЗУЛЬТАТ: Исключение скрыто до await()
        │
        ├─ Можно обработать в try-catch
        │
        └─ Parent НЕ затронут!

🎯 Способы отмены корутин

val job = launch {
    repeat(1000) { i ->
        println("Работаю: $i")
        delay(500) // suspension point - место для проверки отмены
    }
}

// СПОСОБ 1: Простая отмена
job.cancel()                    // Отменить (некоопера тивно)

// СПОСОБ 2: Отмена с сообщением
job.cancel(CancellationException("Превышен таймаут"))

// СПОСОБ 3: Отмена + ожидание завершения
job.cancelAndJoin()             // отмена + join()


// ⏱️ TIMING DIAGRAM

┌──────────────────────────────┐
│ launch { repeat(1000) }      │
├──────────────────────────────┤
│ Iteration 1 ▓▓ 500ms         │
│ Iteration 2 ▓▓ 500ms         │
│ job.cancel() called ─────────┼─→ CancellationException thrown
│ Iteration 3 [CANCELLED]      │
│ Cleanup                      │
└──────────────────────────────┘

✅ Проверка статуса отмены

Способ 1: ensureActive() - строгий контроль

suspend fun processingWithEnsureActive() {
    repeat(1000) { iteration ->
        // Throws CancellationException если отменена!
        ensureActive()
        
        doWork(iteration)
        delay(100)
    }
}

// Пример с обработкой
suspend fun safeProcessing() {
    try {
        repeat(1000) { iteration ->
            ensureActive() // ← бросает CancellationException
            doWork(iteration)
        }
    } catch (e: CancellationException) {
        cleanup()
        throw e // ВСЕГДА перебросить!
    }
}

Способ 2: isActive - мягкая проверка

suspend fun processingWithIsActive() {
    repeat(1000) { iteration ->
        if (!isActive) { // мягкая проверка
            cleanup()
            return // выход без исключения
        }
        
        doWork(iteration)
        delay(100)
    }
}

// Пример с логированием
suspend fun gracefulProcessing() {
    repeat(1000) { iteration ->
        if (!isActive) {
            log.info("Cancelled at iteration $iteration")
            return
        }
        doWork(iteration)
    }
}

Разница между ensureActive и isActive

Аспект ensureActive() isActive
Тип проверки Strict Soft
При отмене throws CancellationException возвращает false
Использование Когда нужна обработка исключения Когда нужна логика выхода
Performance Немного медленнее Быстро
Рекомендация Для critial sections Для loops/iterations

🚨 Правильная обработка CancellationException

❌ ПЛОХО: логирование CancellationException как ошибки

// ❌ НЕПРАВИЛЬНО!
suspend fun badErrorHandling() {
    try {
        longRunningOperation()
    } catch (e: Exception) {
        log.error("Operation failed", e) // логируем отмену как ошибку!
    }
}

// Результат: В логах увидим кучу CancellationException,
// хотя это нормальная часть жизненного цикла

✅ ХОРОШО: правильная обработка

// ✅ ПРАВИЛЬНО!
suspend fun goodErrorHandling() {
    try {
        longRunningOperation()
    } catch (e: CancellationException) {
        log.info("Operation cancelled")
        cleanup()
        throw e // ВСЕГДА перебросить CancellationException!
    } catch (e: Exception) {
        log.error("Operation failed", e)
        throw e
    }
}

// ИЛИ более краткий вариант:
suspend fun goodErrorHandlingShort() {
    try {
        longRunningOperation()
    } finally {
        cleanup() // выполнится в обоих случаях
    }
}

💡 Practical patterns: Circuit Breaker

class CircuitBreakerService {
    private var failureCount = 0
    private var lastFailureTime = 0L
    private val circuitBreakerTimeout = 30_000L
    
    suspend fun <T> executeWithCircuitBreaker(
        operationName: String,
        block: suspend () -> T
    ): T {
        if (isCircuitOpen()) {
            throw CircuitBreakerOpenException(
                "$operationName is temporarily unavailable"
            )
        }
        
        return try {
            val result = withTimeout(5000) { // 5 second timeout
                block()
            }
            resetFailureCount()
            result
        } catch (e: TimeoutCancellationException) {
            recordFailure()
            throw ServiceTimeoutException("$operationName timed out", e)
        } catch (e: Exception) {
            recordFailure()
            throw e
        }
    }
    
    private fun isCircuitOpen(): Boolean {
        return failureCount >= 5 &&
               (System.currentTimeMillis() - lastFailureTime) < circuitBreakerTimeout
    }
    
    private fun recordFailure() {
        failureCount++
        lastFailureTime = System.currentTimeMillis()
    }
    
    private fun resetFailureCount() {
        failureCount = 0
    }
}

// Использование
val breaker = CircuitBreakerService()

suspend fun loadData(userId: String): UserData {
    return breaker.executeWithCircuitBreaker("userService") {
        apiClient.getUser(userId) // будет отменена если > 5s
    }
}

📊 Сравнение Job vs SupervisorJob

EXCEPTION PROPAGATION
═════════════════════

ОБЫЧНЫЙ JOB (STRICT)
────────────────────
Parent Job
    ├─ Child 1: async { ... }
    ├─ Child 2: async { ... } ← throws Exception
    └─ Child 3: async { ... }

Результат:
    ├─ Child 2 упал с Exception ❌
    ├─ Child 1 ОТМЕНЕНА ❌❌
    ├─ Child 3 ОТМЕНЕНА ❌❌
    └─ Parent ОТМЕНЁН ❌❌

Все упали!


SUPERVISORJOB (LENIENT)
──────────────────────
Parent SupervisorJob
    ├─ Child 1: async { ... } ✓
    ├─ Child 2: async { ... } ← throws Exception ❌
    └─ Child 3: async { ... } ✓

Результат:
    ├─ Child 2 упал с Exception ❌ (изолирован!)
    ├─ Child 1 продолжает работу ✓
    ├─ Child 3 продолжает работу ✓
    └─ Parent продолжает работу ✓

Остальные работают!
Аспект Job SupervisorJob
Ошибка дочерней Отменяет всех siblings Не влияет на других
Отмена родителя Отменяет всех детей Отменяет всех детей
Parentчка ждёт Да, завершения всех Да, завершения всех
Use case Транзакции, critical ops Dashboard, batch ops

🛡️ Practical patterns: Retry with Exponential Backoff

suspend inline fun <T> withRetry(
    times: Int = 3,
    initialDelay: Long = 100,
    maxDelay: Long = 10000,
    factor: Double = 2.0,
    shouldRetry: (Exception) -> Boolean = { it is IOException },
    crossinline block: suspend () -> T
): T {
    var currentDelay = initialDelay
    var lastException: Exception? = null
    
    repeat(times - 1) { attempt ->
        try {
            return block()
        } catch (e: CancellationException) {
            throw e // НИКОГДА не ретраим отмену!
        } catch (e: Exception) {
            if (!shouldRetry(e)) throw e
            
            lastException = e
            log.warn("Attempt ${attempt + 1}/$times failed, " +
                    "retrying in ${currentDelay}ms", e)
            delay(currentDelay)
            currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
        }
    }
    
    return try {
        block() // последняя попытка
    } catch (e: Exception) {
        throw lastException ?: e
    }
}

// Использование
suspend fun reliableApiCall(url: String): String {
    return withRetry(
        times = 3,
        initialDelay = 100,
        shouldRetry = { it is IOException || it is TimeoutException }
    ) {
        httpClient.get(url)
    }
}

🎯 Graceful shutdown с обработкой отмены

class DataProcessor : CoroutineScope {
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + job
    
    private val _state = MutableStateFlow<State>(State.IDLE)
    val state = _state.asStateFlow()
    
    fun start() {
        launch {
            repeat(Int.MAX_VALUE) { iteration ->
                try {
                    ensureActive() // проверка отмены
                    
                    val data = fetchBatch()
                    processBatch(data)
                    
                    _state.value = State.PROCESSING(iteration)
                    delay(1000) // work interval
                    
                } catch (e: CancellationException) {
                    log.info("Processing cancelled at iteration $iteration")
                    _state.value = State.CANCELLED
                    saveProgress(iteration)
                    throw e
                }
            }
        }
    }
    
    suspend fun gracefulShutdown(timeout: Long = 30_000) {
        log.info("Starting graceful shutdown...")
        _state.value = State.SHUTTING_DOWN
        
        try {
            withTimeout(timeout) {
                job.cancelAndJoin()
            }
        } catch (e: TimeoutCancellationException) {
            log.error("Shutdown timeout, forcing termination")
            job.cancel()
        }
        
        log.info("Shutdown complete")
    }
}

// Использование
val processor = DataProcessor()

runtime.addShutdownHook(Thread {
    runBlocking {
        processor.gracefulShutdown(timeout = 30_000)
    }
})

📋 Чеклист: Обработка отмены


🎓 Ключевые выводы

  1. CancellationException - не ошибка, перебросьте её!
  2. ensureActive() - для строгого контроля в critical sections
  3. isActive - для graceful exit из циклов
  4. Job - для "всё или ничего" операций
  5. SupervisorJob - для независимых операций
  6. try-finally - для гарантированного cleanup
  7. Retry с backoff - для нестабильных операций
  8. Graceful shutdown - обязателен в production

CoroutineContext

Что такое CoroutineContext?

CoroutineContext — это индексированный набор элементов, который определяет окружение выполнения корутины. Это immutable коллекция, где каждый элемент имеет уникальный ключ (Key) и может быть получен по этому ключу.

Основные принципы:

  • Композиция — элементы объединяются через оператор +
  • Наследование — дочерние корутины наследуют контекст родителя
  • Переопределение — новые элементы заменяют старые с тем же ключом
  • Неизменяемость — создаётся новый контекст при изменениях
// Базовая структура CoroutineContext
val context: CoroutineContext =
    Dispatchers.IO +                    // где выполняется
            SupervisorJob() +                   // управление жизненным циклом  
            CoroutineName("ApiService") +       // имя для отладки
            CoroutineExceptionHandler { _, ex -> // обработка исключений
                log.error("Unhandled exception", ex)
            }

📊 Структура и композиция CoroutineContext

CoroutineContext = Indexed Set of Elements
═══════════════════════════════════════════

context: CoroutineContext
    │
    ├─ Dispatcher (Key: ContinuationInterceptor)
    │  ├─ Value: Dispatchers.IO
    │  └─ Определяет: где выполняется корутина
    │
    ├─ Job (Key: Job)
    │  ├─ Value: SupervisorJob()
    │  └─ Определяет: иерархия и жизненный цикл
    │
    ├─ CoroutineName (Key: CoroutineName)
    │  ├─ Value: "ApiService"
    │  └─ Определяет: имя для отладки
    │
    └─ CoroutineExceptionHandler (Key: CoroutineExceptionHandler)
       ├─ Value: { _, ex -> log.error(...) }
       └─ Определяет: обработка необработанных исключений


КОМПОЗИЦИЯ ЧЕРЕЗ ОПЕРАТОР +
═════════════════════════════

val dispatcher = Dispatchers.IO
val job = SupervisorJob()
val name = CoroutineName("DataFetcher")

val context1 = dispatcher + job
val context2 = context1 + name

// Результат: context2 содержит все три элемента!

// Переопределение:
val contextv2 = context2 + Dispatchers.Default
// ↑ заменяет Dispatchers.IO на Dispatchers.Default

🔄 Наследование контекста

PARENT-CHILD CONTEXT INHERITANCE
═════════════════════════════════

Parent Coroutine
    context: [Dispatchers.IO, Job1, CoroutineName("Parent")]
    │
    ├─ launch { } // наследует контекст родителя
    │   context: [Dispatchers.IO, Job1, CoroutineName("Parent")]
    │   ├─ ChildJob = Job() (новый Job из Parent context!)
    │   └─ Dispatcher, Name наследованы
    │
    └─ launch(Dispatchers.Default) { } // переопределение
        context: [Dispatchers.Default, Job1, CoroutineName("Parent")]
        └─ Dispatcher переопределён, Job наследован


ПЕРЕОПРЕДЕЛЕНИЕ В ВЛОЖЕННЫХ КОРУТИНАХ
═══════════════════════════════════════

fun parentScope() = CoroutineScope(
    Dispatchers.IO + 
    CoroutineName("ParentScope")
)

parentScope().launch {
    // context = [Dispatchers.IO, CoroutineName("ParentScope")]
    
    launch(CoroutineName("ChildTask")) {
        // context = [Dispatchers.IO, CoroutineName("ChildTask")]
        // ↑ Name переопределена!
    }
    
    withContext(Dispatchers.Default) {
        // context = [Dispatchers.Default, CoroutineName("ParentScope")]
        // ↑ Dispatcher переопределён!
    }
}

📋 Основные элементы CoroutineContext

Dispatcher - где выполняется

// DISPATCHER LAYER IN CONTEXT

val ioContext = Dispatchers.IO
// Key: ContinuationInterceptor
// Value: CoroutineDispatcher (IO pool)

val context = Dispatchers.IO + SupervisorJob()
val dispatcher = context[ContinuationInterceptor]

// Использование:
launch(context) {
    // выполнится в Dispatchers.IO потоке
}

withContext(Dispatchers.Default) {
    // переключится в Default поток (CPU cores)
}

Job - управление жизненным циклом

// JOB LAYER IN CONTEXT

val parentJob = SupervisorJob()
val context = parentJob + CoroutineName("Main")

scope.launch(context) {
    val currentJob = coroutineContext[Job] ✓
    // currentJob заменяется на Job() для этой корутины!
    
    println(currentJob.isActive)
}

// Отмена через context:
context.cancel() // отменяет все корутины с этим контекстом

CoroutineName - для отладки

// COROUTINE NAME FOR DEBUGGING

val context = CoroutineName("DataFetcher")

launch(context) {
    val name = coroutineContext[CoroutineName]?.name
    println(name) // "DataFetcher"
}

// Использование в логах:
class LoggingInterceptor {
    fun log(msg: String) {
        val coroutineName = coroutineContext[CoroutineName]?.name ?: "unknown"
        logger.info("[$coroutineName] $msg")
    }
}

CoroutineExceptionHandler - обработка исключений

// EXCEPTION HANDLER IN CONTEXT

val exceptionHandler = CoroutineExceptionHandler { _, exception ->
    log.error("Unhandled coroutine exception", exception)
    metricsService.recordException(exception)
}

val context = SupervisorJob() + exceptionHandler

scope.launch(context) {
    throw RuntimeException("Oops!")
    // → exceptionHandler будет вызван
}

// ВАЖНО: ExceptionHandler только для launch()
scope.async(context) {
    throw RuntimeException("Oops!")
    // → async НЕ вызывает handler, исключение идёт в await()
}

🛡️ Кастомные элементы контекста

// Пример 1: Request ID для логирования

class RequestId(val id: String) : 
    AbstractCoroutineContextElement(RequestId) {
    
    companion object Key : CoroutineContext.Key<RequestId>
}

suspend fun complexOperation(requestId: String) {
    withContext(RequestId(requestId)) {
        service1()      // может получить requestId из контекста
        service2()      // может получить requestId из контекста
    }
}

// Использование:
fun getRequestId(): String? {
    return coroutineContext[RequestId]?.id
}

// Пример 2: Custom Dispatcher для ограничения параллелизма

val limitedIO = Dispatchers.IO.limitedParallelism(4)
val context = limitedIO + CoroutineName("LimitedService")

launch(context) {
    // максимум 4 одновременные операции
}

// Пример 3: Metrics контекст

class MetricsContext(val serviceName: String) :
    AbstractCoroutineContextElement(MetricsContext) {
    
    companion object Key : CoroutineContext.Key<MetricsContext>
    
    fun recordTime(operationName: String, duration: Long) {
        meterRegistry.timer("$serviceName.$operationName")
            .record(duration, TimeUnit.MILLISECONDS)
    }
}

val metricsContext = MetricsContext("UserService")

launch(metricsContext) {
    val start = System.currentTimeMillis()
    try {
        fetchUser()
    } finally {
        val duration = System.currentTimeMillis() - start
        coroutineContext[MetricsContext]?.recordTime("fetchUser", duration)
    }
}

📊 Получение элементов из контекста

// Способ 1: Прямое получение по Key

val context = Dispatchers.IO + CoroutineName("Service")

val dispatcher = context[ContinuationInterceptor]
val name = context[CoroutineName]

println(dispatcher) // Dispatchers.IO
println(name?.name) // "Service"

// Способ 2: Из coroutineContext suspend функции

suspend fun myFunction() {
    val currentJob = coroutineContext[Job]
    val currentName = coroutineContext[CoroutineName]?.name
    val currentDispatcher = coroutineContext[ContinuationInterceptor]
    
    println("Job: $currentJob")
    println("Name: $currentName")
    println("Dispatcher: $currentDispatcher")
}

// Способ 3: Все элементы контекста

suspend fun printAllContextElements() {
    coroutineContext.forEach { element ->
        println("Element: ${element.key} = ${element.value}")
    }
}

⚠️ Осторожно: ThreadLocal vs CoroutineContext

// ❌ ПЛОХО: ThreadLocal НЕ работает с корутинами!

val userId = ThreadLocal<String>()

suspend fun badApproach() {
    userId.set("user-123")
    
    withContext(Dispatchers.IO) { // ПЕРЕКЛЮЧЕНИЕ ПОТОКА!
        println(userId.get()) // NULL! Данные потеряны!
    }
}

// ✅ ХОРОШО: Используйте CoroutineContext элементы

class UserId(val value: String) :
    AbstractCoroutineContextElement(UserId) {
    
    companion object Key : CoroutineContext.Key<UserId>
}

suspend fun goodApproach() {
    withContext(UserId("user-123")) {
        // Any dispatcher!
        val userId = coroutineContext[UserId]?.value
        println(userId) // "user-123" ✓ ВСЕГДА работает!
    }
}

// ✅ ДАЖЕ ЛУЧШЕ: Использование с nested calls

suspend fun complexCall(userId: String) {
    withContext(UserId(userId)) {
        service1()  // может получить userId
        service2()  // может получить userId
    }
}

suspend fun service1() {
    // Получение userId из контекста
    val userId = coroutineContext[UserId]?.value
    log.info("Service1 called for user: $userId")
}

🎯 Практический пример: RequestContext для микросервисов

data class RequestContext(
    val requestId: String,
    val userId: String,
    val traceId: String
) : AbstractCoroutineContextElement(RequestContext) {
    
    companion object Key : CoroutineContext.Key<RequestContext>
}

// Middleware
class CoroutineContextMiddleware {
    suspend fun <T> processRequest(
        request: HttpRequest,
        handler: suspend () -> T
    ): T {
        val context = RequestContext(
            requestId = UUID.randomUUID().toString(),
            userId = request.header("X-User-Id") ?: "anonymous",
            traceId = request.header("X-Trace-Id") ?: UUID.randomUUID().toString()
        )
        
        return withContext(context) {
            handler()
        }
    }
}

// Services могут получить контекст
class UserService {
    suspend fun getUser(userId: String): User {
        val context = coroutineContext[RequestContext]
        log.info("[${context?.requestId}] Getting user $userId")
        
        return withContext(Dispatchers.IO) {
            // RequestContext автоматически наследуется!
            repository.findById(userId)
        }
    }
}

// Logging interceptor
class ContextAwareLogger {
    fun info(message: String) {
        val requestId = coroutineContext[RequestContext]?.requestId ?: "-"
        val userId = coroutineContext[RequestContext]?.userId ?: "-"
        
        logger.info("[$requestId][$userId] $message")
    }
}

📋 Чеклист: Работа с CoroutineContext


🎓 Ключевые выводы

  1. CoroutineContext = индексированный набор элементов
  2. Композиция через оператор +
  3. Наследование - дочерние наследуют родительский контекст
  4. Переопределение - новые элементы заменяют старые
  5. Dispatcher - где выполняется корутина
  6. Job - управление жизненным циклом
  7. CoroutineName - для отладки
  8. ExceptionHandler - обработка необработанных исключений
  9. Кастомные элементы - для application-specific данных
  10. CoroutineContext вместо ThreadLocal - в корутинах!

Kotlin Flow

📊 Flow vs RxJava vs Reactive Streams

COMPARISON MATRIX
=================

                FLOW         RxJava      Reactor     SharedFlow
┌──────────────────────────────────────────────────────────────┐
│ Холодный/Горячий     COLD        COLD/HOT    COLD/HOT    HOT │
│ Платформа       Multiplatform  Android    JVM/Java   Kotlin  │
│ Thread safety    suspend-safe   safe       safe       safe    │
│ Простота        ★★★★★         ★★★        ★★         ★★★     │
│ Производительность  ★★★★      ★★★★       ★★★★      ★★★★    │
│ Use case       Kotlin apps   Android    Reactive   Events   │
└──────────────────────────────────────────────────────────────┘

ХОЛОДНЫЙ ПОТОК (FLOW)
═════════════════════

User₁ .collect()           User₂ .collect()
    ▼                          ▼
┌─────────────────┐        ┌─────────────────┐
│ Producer starts │        │ Producer starts │
│ (fresh code)    │        │ (fresh code)    │
│ emit 1, 2, 3    │        │ emit 1, 2, 3    │
└─────────────────┘        └─────────────────┘

Каждый подписчик запускает производителя заново!


ГОРЯЧИЙ ПОТОК (SHAREDFLOW)
══════════════════════════

Producer (runs once)
    │
    emit 1, 2, 3
    │
    ├─→ User₁ sees: 2, 3 (пропустил 1)
    ├─→ User₂ sees: 3 (пропустили 1, 2)
    └─→ User₃ sees: 1, 2, 3 (использует replay)

Один производитель для всех подписчиков!


СОСТОЯНИЕ (STATEFLOW)
════════════════════

[State: 42]
    │
    ├─→ User₁ .collect() → получает 42 сразу
    ├─→ User₂ .collect() → получает 42 сразу
    └─→ State changes to 99
        ├─→ User₁ → получает 99
        └─→ User₂ → получает 99

Новые подписчики получают ПОСЛЕДНЕЕ значение!

🔄 Flow операторы - визуальная цепочка

SOURCE FLOW DATA TRANSFORMATION PIPELINE
=========================================

Source:  1, 2, 3, 4, 5
             │
             ▼
        ┌─────────────┐
        │   filter    │  Исключить нечётные
        │  { it % 2 } │
        └─────────────┘
             │
            ▼
          2, 4
             │
             ▼
        ┌──────────────┐
        │     map      │  Преобразовать в строки
        │  { "N: $it" }│
        └──────────────┘
             │
            ▼
       "N: 2", "N: 4"
             │
             ▼
        ┌──────────────┐
        │   flatMap    │  Раскрыть каждое значение
        │   { async{}} │
        └──────────────┘
             │
            ▼
    Расширенные данные
             │
             ▼
        ┌────────────────┐
        │  flowOn(IO)    │  Переключить контекст
        └────────────────┘
             │
            ▼
    В контексте Dispatchers.IO
             │
             ▼
        ┌────────────────┐
        │    catch       │  Обработка ошибок
        │ { fallback() } │
        └────────────────┘
             │
            ▼
    Конечные данные с fallback
             │
            ▼ collect { result -> ... }


TIMING DIAGRAM - COLD FLOW EXECUTION
────────────────────────────────────

Flow creation:  flow { emit(1); emit(2); emit(3) }
                        │
                        ├─ NOT executing yet!
                        │
Subscriber 1 calls .collect():
                        │
                        ▼ STARTS EXECUTION
                    ┌────────────────┐
    Time 0:         │ emit 1         │
                    └────────────────┘
                         │
    Time 1000ms:    ┌────────────────┐
                    │ emit 2         │
                    └────────────────┘
                         │
    Time 2000ms:    ┌────────────────┐
                    │ emit 3         │
                    └────────────────┘
                         │
                        ▼ STOPS

Subscriber 2 calls .collect() (later):
                        │
                        ▼ STARTS EXECUTION (again!)
                    ┌────────────────┐
    Time 3000ms:    │ emit 1         │
                    └────────────────┘
                         │
    Time 4000ms:    ┌────────────────┐
                    │ emit 2         │
                    └────────────────┘

📋 Flow операторы - справочник

Трансформация данных

// map - преобразование каждого элемента
numbersFlow().map { it * 2 }          // 1,2,3,4,5 → 2,4,6,8,10

// filter - отбор элементов
numbersFlow().filter { it % 2 == 0 }  // 1,2,3,4,5 → 2,4

// flatMapConcat - последовательное раскрытие
userIds.asFlow()
    .flatMapConcat { userId ->
        userOrders(userId) // для каждого userId получить его заказы
    }

// flatMapMerge - параллельное раскрытие
userIds.asFlow()
    .flatMapMerge(concurrency = 3) { userId ->
        userOrders(userId) // максимум 3 параллельных запроса
    }

// transform - полный контроль
numbersFlow().transform { value ->
    emit(value)
    emit(value * 2)  // emit может быть вызван несколько раз
}

Операции без изменения потока

// onEach - выполнить side effect для каждого элемента
dataFlow()
    .onEach { data ->
        log.info("Processing: $data")
        metricsService.record(data)
    }

// onStart - выполнить при начале подписки
dataFlow().onStart { log.info("Collection started") }

// onCompletion - выполнить при завершении
dataFlow().onCompletion { log.info("Collection completed") }

// onEmpty - выполнить если поток пуст
dataFlow().onEmpty { log.warn("No data received") }

Объединение потоков

// combine - комбинировать последние значения из двух потоков
val users = usersFlow()
val statuses = statusesFlow()

combine(users, statuses) { user, status ->
    UserWithStatus(user, status)
}

// zip - объединить по паре
val names = namesFlow()  // Alice, Bob, Charlie
val ages = agesFlow()    // 30, 25, 35

zip(names, ages) { name, age ->
    Person(name, age)
}
// Результат: Person(Alice, 30), Person(Bob, 25), Person(Charlie, 35)

// merge - объединить в один поток
val events1 = eventFlow1()
val events2 = eventFlow2()

merge(events1, events2) // все события в один поток

Управление потоком

// take - взять первые N элементов
dataFlow().take(5)

// drop - пропустить первые N элементов
dataFlow().drop(3)

// filter - оставить элементы по условию
dataFlow().filter { it.isValid }

// distinct - только уникальные значения
dataFlow().distinct()

// distinctUntilChanged - пропустить последовательные дубли
dataFlow().distinctUntilChanged()

// debounce - игнорировать быстрые изменения
searchFlow.debounce(300)  // ждём 300ms перед emit

// throttle - ограничить частоту emit
eventFlow.throttle(100)   // максимум один раз за 100ms

// sample - брать значение каждые N ms
sensorFlow.sample(1000)   // снимок каждую секунду

// timeout - отмена если нет данных за время
dataFlow.timeout(5000)    // бросить TimeoutCancellationException если > 5 сек молчания

// retry - переподписаться при ошибке
dataFlow.retry(times = 3) { throwable ->
    throwable is IOException // retry только на сетевых ошибках
}

Терминальные операторы

// collect - базовая подписка
dataFlow().collect { value ->
    println(value)
}

// first - получить первый элемент
val firstValue = dataFlow().first()

// last - получить последний элемент
val lastValue = dataFlow().last()

// single - получить единственный элемент (или ошибка)
val onlyValue = dataFlow().single() // throws if not exactly one

// toList - собрать все элементы в список
val allValues = dataFlow().toList()

// toSet - собрать в set
val uniqueValues = dataFlow().toSet()

// fold - аккумулировать значения
val sum = numbersFlow().fold(0) { acc, value -> acc + value }

// reduce - аккумулировать с начальным значением из потока
val sum = numbersFlow().reduce { acc, value -> acc + value }

🌊 StateFlow и SharedFlow - горячие потоки

// StateFlow - текущее состояние
class UserViewModel {
    private val _state = MutableStateFlow<UiState>(UiState.Loading)
    val state: StateFlow<UiState> = _state.asStateFlow()
    
    fun loadUser(id: String) {
        viewModelScope.launch {
            try {
                _state.value = UiState.Loading
                val user = userService.getUser(id)
                _state.value = UiState.Success(user)
            } catch (e: Exception) {
                _state.value = UiState.Error(e.message)
            }
        }
    }
}

// Подписка - новый подписчик получает ТЕКУЩЕЕ состояние
viewModel.state.collect { state ->
    when (state) {
        is UiState.Loading -> showProgress()
        is UiState.Success -> displayUser(state.user)
        is UiState.Error -> showError(state.message)
    }
}


// SharedFlow - события без состояния
class EventBus {
    private val _events = MutableSharedFlow<AppEvent>()
    val events = _events.asSharedFlow()
    
    suspend fun publishEvent(event: AppEvent) {
        _events.emit(event)
    }
}

// Подписка - получает только НОВЫЕ события
eventBus.events.collect { event ->
    when (event) {
        is UserLoggedIn -> handleLogin(event.userId)
        is OrderCreated -> showNotification(event.orderId)
    }
}

🚀 Практические примеры

Пример 1: REST API с retry и timeout

suspend fun fetchUserDataRobust(userId: String): User = withContext(Dispatchers.IO) {
    userFlow(userId)
        .retry(times = 3) { throwable ->
            delay(1000) // exponential backoff
            throwable is IOException || throwable is TimeoutException
        }
        .timeout(10000) // 10 second timeout
        .catch { exception ->
            log.error("Failed to fetch user after retries", exception)
            throw UserServiceException("Could not load user", exception)
        }
        .first()
}

fun userFlow(userId: String): Flow<User> = flow {
    val user = httpClient.get("/users/$userId")
    emit(user)
}

Пример 2: Pagination через Flow

fun loadAllUsersFlow(pageSize: Int = 50): Flow<User> = flow {
    var page = 0
    while (currentCoroutineContext().isActive) {
        val users = userRepository.findByPage(page, pageSize)
        if (users.isEmpty()) break
        
        users.forEach { emit(it) }
        page++
    }
}

// Использование
suspend fun processAllUsers() {
    loadAllUsersFlow()
        .filter { it.isActive }
        .map { it.toLDTO() }
        .take(100)
        .collect { user ->
            processUser(user)
        }
}

Пример 3: Поиск с debounce

class SearchViewModel {
    private val _searchQuery = MutableStateFlow("")
    
    val searchResults: StateFlow<List<SearchResult>> =
        _searchQuery
            .debounce(300) // wait 300ms before searching
            .distinctUntilChanged()
            .flatMapLatest { query ->
                if (query.isBlank()) flowOf(emptyList())
                else searchService.search(query)
            }
            .stateIn(
                scope = viewModelScope,
                started = SharingStarted.WhileSubscribed(5000),
                initialValue = emptyList()
            )
    
    fun updateSearchQuery(query: String) {
        _searchQuery.value = query
    }
}

🛡️ Error Handling в Flow

// catch - обработка ошибок в upstream
dataFlow()
    .map { risky(it) }
    .catch { exception ->
        when (exception) {
            is NetworkException -> emit(CachedData)
            is ParseException -> log.error("Parse failed", exception)
            else -> throw exception
        }
    }
    .collect { value -> println(value) }

// onCompletion - cleanup
dataFlow()
    .onCompletion { cause ->
        if (cause is CancellationException) {
            log.info("Cancelled")
        } else if (cause != null) {
            log.error("Failed", cause)
        } else {
            log.info("Completed successfully")
        }
    }
    .collect { value -> println(value) }

📋 Чеклист при использовании Flow

Корутины и Spring Framework

Интеграция корутин в Spring

Spring Framework с версии 5.2 предоставляет нативную поддержку корутин Kotlin, позволяя писать неблокирующие реактивные приложения в императивном стиле. Это дает возможность использовать знакомый синтаксис вместо сложных reactive chains.

Ключевые преимущества:

  • Императивный код вместо callback chains
  • Structured concurrency — автоматическое управление жизненным циклом
  • Высокая производительность — неблокирующие I/O операции
  • Простота отладки — линейный flow выполнения

📊 Spring MVC vs WebFlux + Coroutines

SPRING MVC (Blocking)
═════════════════════

@RestController
class UserController(private val userService: UserService) {
    
    @GetMapping("/users/{id}")
    fun getUser(@PathVariable id: String): User {
        return userService.findById(id) // ← БЛОКИРУЕТ ПОТОК
    }
}

Request workflow:
┌─ HTTP Request arrives
├─ Spring allocates Thread from pool
├─ getUser() called → blocks thread
│  └─ userService.findById() → JDBC query ← БЛОКИРУЕТ!
├─ Thread frozen while waiting
├─ Response sent
└─ Thread released


SPRING WEBFLUX + COROUTINES (Non-blocking)
═══════════════════════════════════════════

@RestController
class UserControllerCoroutines(private val userService: UserService) {
    
    @GetMapping("/users/{id}")
    suspend fun getUser(@PathVariable id: String): User {
        return userService.findById(id) // suspend, не блокирует!
    }
}

Request workflow:
┌─ HTTP Request arrives
├─ Spring allocates Coroutine (not thread!)
├─ getUser() called → suspends
│  └─ userService.findById() → R2DBC query ← SUSPENDS, не блокирует!
├─ Thread freed for other requests
├─ Query complete → resume coroutine
├─ Response sent
└─ Coroutine released


SCALABILITY COMPARISON
══════════════════════

Thread Pool: 200 threads
├─ Blocks on I/O → threads frozen
├─ Can serve ~200 concurrent requests
└─ Memory: ~200-300 MB

Coroutine Pool: 2 threads (Dispatchers.IO: 64+)
├─ Suspends on I/O → threads active
├─ Can serve ~10,000+ concurrent requests  
└─ Memory: ~1-2 MB

🎯 Основные компоненты

1. Suspend контроллеры

@RestController
@RequestMapping("/api/users")
class UserControllerCoroutines(
    private val userService: UserService,
    private val orderService: OrderService
) {
    
    // Простой случай: one-to-one операция
    @GetMapping("/{id}")
    suspend fun getUserWithOrders(@PathVariable id: String): UserWithOrders {
        val user = userService.findById(id) // suspend
        val orders = orderService.findByUserId(id) // suspend
        
        return UserWithOrders(user, orders)
    }
    
    // ⏱️ TIMING:
    // Sequential suspend: ~200ms (if each takes 100ms)
    // Parallel would be better!
    
    // Параллельные операции
    @GetMapping("/{id}/dashboard")
    suspend fun getUserDashboard(@PathVariable id: String): Dashboard = coroutineScope {
        val userAsync = async { userService.findById(id) }
        val ordersAsync = async { orderService.findByUserId(id) }
        val statsAsync = async { statsService.getUserStats(id) }
        
        Dashboard(
            user = userAsync.await(),
            orders = ordersAsync.await(),
            stats = statsAsync.await()
        )
    }
    
    // ⏱️ TIMING:
    // Parallel async: ~100ms (best of all)
}

2. R2DBC - реактивный доступ к БД

// ✅ R2DBC repositories возвращают suspend функции
interface UserRepository : CoroutineCrudRepository<User, String> {
    suspend fun findById(id: String): User? // suspend, не blocking!
    suspend fun findByEmail(email: String): User?
    suspend fun save(user: User): User
}

// Spring автоматически преобразует Mono/Flux в suspend функции!

@Service
class UserService(private val userRepository: UserRepository) {
    
    suspend fun findById(id: String): User? {
        return userRepository.findById(id)
    }
    
    suspend fun findAll(): List<User> {
        return userRepository.findAll().toList() // Flow to List
    }
    
    suspend fun save(user: User): User {
        return userRepository.save(user)
    }
}

// Batch операции
@Service
class BatchUserService(private val userRepository: UserRepository) {
    
    suspend fun saveAllUsers(users: List<User>) = supervisorScope {
        // Параллельное сохранение батчами
        users.chunked(100)
            .map { batch ->
                async {
                    batch.forEach { userRepository.save(it) }
                }
            }
            .forEach { it.await() }
    }
}

3. WebClient - неблокирующий HTTP клиент

@Service
class ExternalApiService(
    private val webClient: WebClient,
    private val exceptionHandler: CoroutineExceptionHandler
) {
    
    suspend fun fetchRemoteUser(id: String): RemoteUser = 
        withContext(Dispatchers.IO + exceptionHandler) {
            withTimeout(5000) {
                webClient
                    .get()
                    .uri("/users/{id}", id)
                    .retrieve()
                    .awaitBody<RemoteUser>()
            }
        }
    
    suspend fun fetchMultipleUsers(ids: List<String>): List<RemoteUser> = 
        supervisorScope {
            ids.map { id ->
                async { 
                    try {
                        fetchRemoteUser(id)
                    } catch (e: Exception) {
                        log.warn("Failed to fetch user $id", e)
                        null
                    }
                }
            }
            .mapNotNull { it.await() }
        }
}

// ⏱️ TIMING:
// Sequential: 5 users × 100ms = 500ms
// Parallel async: max(100ms) = 100ms ✓

🔒 Spring Security с корутинами

// Получение current user из Security Context

@Service
class SecureUserService(private val userRepository: UserRepository) {
    
    suspend fun getCurrentUser(): User? {
        return SecurityContextHolder.getContext()?.authentication?.let { auth ->
            userRepository.findByEmail(auth.name)
        }
    }
    
    suspend fun getUserData(): UserData = coroutineScope {
        val currentUser = getCurrentUser() 
            ?: throw UnauthorizedException("Not authenticated")
        
        val user = async { userRepository.findById(currentUser.id) }
        val orders = async { fetchUserOrders(currentUser.id) }
        
        UserData(
            user = user.await(),
            orders = orders.await()
        )
    }
}

// Security filter для корутин
@Component
class CoroutineSecurityFilter : OncePerRequestFilter() {
    
    override fun doFilterInternal(
        request: HttpServletRequest,
        response: HttpServletResponse,
        filterChain: FilterChain
    ) {
        // Spring Security работает с корутинами через ThreadLocal
        // обычным способом (за сценами работает правильно)
        filterChain.doFilter(request, response)
    }
}

📊 Error handling в Spring WebFlux

@RestController
@RequestMapping("/api")
class ErrorHandlingController(private val service: Service) {
    
    @GetMapping("/safe")
    suspend fun safeEndpoint(): ResponseEntity<Data> {
        return try {
            val data = service.fetchData()
            ResponseEntity.ok(data)
        } catch (e: BusinessException) {
            ResponseEntity.status(HttpStatus.BAD_REQUEST)
                .body(ErrorResponse(e.message))
        } catch (e: Exception) {
            log.error("Unexpected error", e)
            ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .build()
        }
    }
    
    // Global exception handler
    @ExceptionHandler(TimeoutException::class)
    suspend fun handleTimeout(ex: TimeoutException): ResponseEntity<ErrorResponse> {
        return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
            .body(ErrorResponse("Operation timed out"))
    }
}

// ExceptionHandler filter
@Component
class CoroutineExceptionHandlerFilter : OncePerRequestFilter() {
    
    override fun doFilterInternal(
        request: HttpServletRequest,
        response: HttpServletResponse,
        filterChain: FilterChain
    ) {
        try {
            filterChain.doFilter(request, response)
        } catch (e: CancellationException) {
            // Handle gracefully, don't log as error
            log.debug("Request cancelled")
        } catch (e: Exception) {
            log.error("Request failed", e)
            response.sendError(HttpStatus.INTERNAL_SERVER_ERROR.value())
        }
    }
}

🌊 Server-Sent Events (SSE) с Flow

@RestController
@RequestMapping("/api/events")
class EventStreamController(private val eventService: EventService) {
    
    @GetMapping("/stream")
    suspend fun streamEvents(): ResponseEntity<Flow<ServerSentEvent<Event>>> {
        return ResponseEntity.ok(
            eventService.eventFlow()
                .map { ServerSentEvent.builder<Event>()
                    .id(it.id)
                    .data(it)
                    .build()
                }
                .catch { e ->
                    log.error("Event stream error", e)
                    emit(ServerSentEvent.builder<Event>()
                        .comment("Error: ${e.message}")
                        .build())
                }
        )
    }
}

@Service
class EventService(private val repository: EventRepository) {
    
    fun eventFlow(): Flow<Event> = flow {
        while (currentCoroutineContext().isActive) {
            val newEvents = repository.findNewEvents()
            newEvents.forEach { emit(it) }
            delay(1000) // poll every second
        }
    }
}

// ⏱️ TIMING:
// 1000 clients × Flow push = 1 thread handling all!

🎯 Миграция с Mono/Flux на suspend

// БЫЛО: Reactor way (Mono/Flux chains)
@GetMapping("/old-way")
fun getUserOldWay(@PathVariable id: String): Mono<User> {
    return userRepository.findById(id)
        .doOnNext { log.info("Found user: ${it.name}") }
        .onErrorMap { UserNotFoundException(it.message) }
        .timeout(Duration.ofSeconds(5))
}

// СТАЛО: Suspend way (imperative)
@GetMapping("/new-way")
suspend fun getUserNewWay(@PathVariable id: String): User {
    val user = userRepository.findById(id)
        ?: throw UserNotFoundException("User $id not found")
    
    log.info("Found user: ${user.name}")
    
    return user
}

// Конвертация если нужно:
@GetMapping("/hybrid")
suspend fun hybridEndpoint(): Flow<User> {
    return userRepository.findAll() // R2DBC returns Flow
        .onEach { log.info("Processing user: ${it.name}") }
        .timeout(5.seconds) // timeout для Flow
}

📊 Практический пример: Полный микросервис

@Configuration
class CoroutineConfig {
    @Bean
    fun webClient(): WebClient = WebClient.create()
}

@RestController
@RequestMapping("/api/users")
class UserApiController(
    private val userService: UserService,
    private val exceptionHandler: CoroutineExceptionHandler
) {
    
    @GetMapping
    suspend fun listUsers(): List<UserDTO> {
        return userService.getAllUsers()
            .map { it.toDTO() }
    }
    
    @GetMapping("/{id}")
    suspend fun getUser(@PathVariable id: String): UserDTO {
        return userService.findById(id)?.toDTO()
            ?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
    }
    
    @PostMapping
    suspend fun createUser(@RequestBody request: CreateUserRequest): UserDTO {
        val user = userService.create(request)
        return user.toDTO()
    }
    
    @GetMapping("/{id}/dashboard")
    suspend fun getUserDashboard(
        @PathVariable id: String
    ): UserDashboardDTO = coroutineScope {
        val userAsync = async { userService.findById(id) }
        val ordersAsync = async { userService.getUserOrders(id) }
        val statisticsAsync = async { userService.getUserStats(id) }
        
        UserDashboardDTO(
            user = userAsync.await()?.toDTO()
                ?: throw ResponseStatusException(HttpStatus.NOT_FOUND),
            orders = ordersAsync.await(),
            statistics = statisticsAsync.await()
        )
    }
}

@Service
class UserService(
    private val userRepository: UserRepository,
    private val orderRepository: OrderRepository,
    private val externalApi: ExternalApiService
) {
    
    suspend fun findById(id: String): User? {
        return userRepository.findById(id)
    }
    
    suspend fun getAllUsers(): List<User> {
        return userRepository.findAll().toList()
    }
    
    suspend fun create(request: CreateUserRequest): User {
        val user = User(
            id = UUID.randomUUID().toString(),
            name = request.name,
            email = request.email
        )
        return userRepository.save(user)
    }
    
    suspend fun getUserOrders(userId: String): List<Order> {
        return orderRepository.findByUserId(userId).toList()
    }
    
    suspend fun getUserStats(userId: String): UserStats = supervisorScope {
        val ordersCountAsync = async { 
            orderRepository.findByUserId(userId).count() 
        }
        val totalSpentAsync = async {
            orderRepository.findByUserId(userId)
                .map { it.amount }
                .fold(0.0) { acc, value -> acc + value }
        }
        
        UserStats(
            ordersCount = ordersCountAsync.await(),
            totalSpent = totalSpentAsync.await()
        )
    }
}

@Repository
interface UserRepository : CoroutineCrudRepository<User, String>

@Repository
interface OrderRepository : CoroutineRepository<Order, String> {
    fun findByUserId(userId: String): Flow<Order>
}

📋 Чеклист: Spring + Coroutines production readiness


🎓 Ключевые выводы

  1. Suspend контроллеры = простой способ к non-blocking endpoints
  2. R2DBC = неблокирующий доступ к БД (вместо JDBC)
  3. WebClient = неблокирующие HTTP запросы
  4. async/await = параллельные операции в контроллерах
  5. supervisorScope = graceful degradation для независимых операций
  6. Exception handling = обязателен для production
  7. Timeout = защита от зависаний
  8. Flow for streaming = идеально для SSE и push уведомлений
  9. Security context = работает нормально с корутинами
  10. Performance = 10x-100x лучше чем thread pool approach

Производительность и оптимизация

📊 Утечки памяти - главные источники

GlobalScope - источник утечек памяти

GlobalScope — это глобальная область видимости корутин, которая живёт всё время жизни приложения. Использование GlobalScope может привести к утечкам памяти и неконтролируемому выполнению корутин.

MEMORY LEAK VISUALIZATION
═════════════════════════

GlobalScope Approach (❌ ОПАСНО):
─────────────────────────────────

Object created:
    │
    └─ GlobalScope.launch { }
         │
         └─ Coroutine continues forever
            ├─ Holds reference to context
            ├─ Holds reference to captured variables
            └─ Never cancelled!

Object destroyed:
    └─ But coroutine still running in GlobalScope
       └─ Memory leak! Object cannot be GC'd


CoroutineScope Approach (✅ ПРАВИЛЬНО):
──────────────────────────────────────

Object created:
    │
    └─ launch { } (внутри scope)
         │
         └─ Coroutine running
            ├─ Holds reference to context
            └─ Linked to scope lifecycle

Object destroyed:
    └─ job.cancel()
         └─ All coroutines cancelled
            ├─ References released
            └─ Object can be GC'd ✓
// ❌ ПЛОХО - утечка памяти (никогда не отменяется!)
class UserService {
    fun loadUserData(userId: String) {
        GlobalScope.launch {
            val userData = fetchUserFromApi(userId) // может выполняться вечно
            updateCache(userData)
        }
        // Нет способа отменить эту корутину!
        // Если UserService уничтожается, корутина продолжает работать
    }
}

// ❌ ПЛОХО - долгоживущие ссылки
class MainActivity {
    fun startBackgroundWork() {
        GlobalScope.launch {
            while (true) {
                processData() // бесконечный цикл
                delay(1000)
                // Активность может быть уничтожена, но корутина продолжит работать
                // и будет держать ссылку на контекст через замыкание
            }
        }
    }
}

// ✅ ХОРОШО - контролируемый scope
class UserService : CoroutineScope {
    private val job = SupervisorJob()
    override val coroutineContext = Dispatchers.IO + job
    
    fun loadUserData(userId: String) {
        launch { // привязано к scope сервиса
            val userData = fetchUserFromApi(userId)
            updateCache(userData)
        }
    }
    
    fun cleanup() {
        job.cancel() // отменяет ВСЕ корутины сервиса
    }
}

// ✅ ХОРОШО - scope привязанный к lifecycle
class BackgroundTaskManager {
    private var serviceJob: Job? = null
    
    fun start() {
        serviceJob = CoroutineScope(Dispatchers.Default).launch {
            while (isActive) { // проверяет отмену
                processData()
                delay(1000)
            }
        }
    }
    
    fun stop() {
        serviceJob?.cancel() // явная отмена
        serviceJob = null
    }
}

⚠️ Проблемы с ViewModels (Android)

// ❌ ПЛОХО - утечка ViewModel
class UserViewModel : ViewModel() {
    fun loadData() {
        GlobalScope.launch { // переживёт ViewModel!
            val data = repository.loadData()
            // Попытка обновить UI после уничтожения ViewModel
            _uiState.value = UiState.Success(data)
        }
    }
}

// ✅ ХОРОШО - viewModelScope
class UserViewModel : ViewModel() {
    fun loadData() {
        viewModelScope.launch { // автоматически отменяется в onCleared()
            val data = repository.loadData()
            _uiState.value = UiState.Success(data)
        }
    }
    
    // Bonus: можно переопределить для тестирования
    override fun onCleared() {
        super.onCleared() // отменяет все корутины
        log.info("ViewModel cleared, all coroutines cancelled")
    }
}

🎯 UI Блокировка при неправильном Dispatcher

Проблема: Main Dispatcher

Main Dispatcher предназначен для UI операций и имеет один поток. Выполнение тяжёлых операций в Main Dispatcher заблокирует UI.

BLOCKING DIAGRAM
════════════════

ПРАВИЛЬНО (60 FPS):
┌─ Frame 0ms:  ▓
├─ Frame 16ms: ▓ (UI responsive)
├─ Frame 32ms: ▓
└─ Frame 48ms: ▓

НЕПРАВИЛЬНО (UI FREEZES):
┌─ Frame 0ms:  ▓
├─ Frame 16ms: █████████████ (BLOCKED!)
├─ Frame 32ms: █████████████ (BLOCKED!)
├─ Frame 48ms: █████████████ (BLOCKED!)
└─ Frame 64ms: ▓ (finally responsive, but 4 frames lost!)


Main Thread Timeline:
═════════════════════

❌ BAD - Heavy computation blocks main thread:
┌────────────────────────────────────────────┐
│ Main Thread                                │
├────────────────────────────────────────────┤
│ withContext(Dispatchers.Main) {           │
│   heavyComputation() ← BLOCKS 500ms!      │
│ }                                          │
│ // UI frozen for 500ms                    │
└────────────────────────────────────────────┘

✅ GOOD - Heavy computation in background:
┌─────────────────────────────┐
│ Main Thread (responsive)    │
├─────────────────────────────┤
│ withContext(Main) { }       │ ← fast
│                             │
│ withContext(Dispatchers... } ← background
│                             │
│ showProgress()              │ ← responsive
└─────────────────────────────┘
// ❌ ПЛОХО - блокировка UI
class DataProcessor {
    suspend fun processLargeDataset(data: List<DataItem>) {
        // Выполняется в Main dispatcher по умолчанию
        data.forEach { item ->
            val result = performHeavyComputation(item) // БЛОКИРУЕТ UI!
            updateProgressBar(result.progress)
        }
    }
}

// ❌ ПЛОХО - синхронные операции в Main
suspend fun loadDataFromNetwork(): String {
    // Если это выполняется в Main dispatcher
    return URL("https://api.example.com/data")
        .readText() // Блокирующий I/O вызов! UI зависнет
}

// ✅ ХОРОШО - правильное использование dispatchers
class DataProcessor {
    suspend fun processLargeDataset(data: List<DataItem>) = withContext(Dispatchers.Main) {
        showProcessing()
        
        data.forEach { item ->
            // Тяжёлые вычисления в background
            val result = withContext(Dispatchers.Default) {
                performHeavyComputation(item)
            }
            // Обновление UI в Main потоке
            updateProgressBar(result.progress)
        }
        
        hideProcessing()
    }
}

// ✅ ХОРОШО - правильная архитектура
class NetworkService {
    suspend fun loadData(): String = withContext(Dispatchers.IO) {
        // I/O операции в IO dispatcher
        httpClient.get("https://api.example.com/data")
    }
    
    suspend fun processData(rawData: String): ProcessedData = withContext(Dispatchers.Default) {
        // CPU-интенсивные операции в Default dispatcher
        parseAndProcess(rawData)
    }
}

class UIController {
    suspend fun refreshData() {
        // UI код остаётся в Main dispatcher
        showLoading()
        
        try {
            val rawData = networkService.loadData()
            val processedData = networkService.processData(rawData)
            
            // Обновление UI в Main dispatcher
            displayData(processedData)
        } catch (e: Exception) {
            showError(e.message)
        } finally {
            hideLoading()
        }
    }
}

⚠️ Блокирующие операции в корутинах

// ❌ ПЛОХО - блокирующие операции в корутинах
suspend fun badDatabaseAccess() {
    // Блокирующий JDBC в корутине
    val connection = DriverManager.getConnection(jdbcUrl)
    val resultSet = connection.createStatement()
        .executeQuery("SELECT * FROM users") // блокирует поток!
}

// ✅ ХОРОШО - неблокирующие операции (R2DBC)
suspend fun goodDatabaseAccess() {
    // R2DBC или другие неблокирующие драйверы
    val users = userRepository.findAll() // suspend функция, не блокирует!
    return users
}

// ✅ ХОРОШО - изоляция блокирующих операций
suspend fun legacySystemIntegration() = withContext(Dispatchers.IO.limitedParallelism(4)) {
    // Изолируем блокирующий код в IO dispatcher с ограничением
    val legacyClient = LegacyJdbcClient()
    legacyClient.blockingOperation() // ограничено 4 потоками макс
}

📊 launch vs async - когда использовать

launch - Fire and Forget (side effects)

// ✅ ХОРОШО - launch для независимых side effects
class NotificationService {
    suspend fun sendNotifications(users: List<User>) {
        users.forEach { user ->
            // Не ждём завершения каждой отправки
            launch {
                emailService.sendEmail(user.email, "Welcome!")
                analyticsService.trackEmailSent(user.id)
            }
        }
        // Функция завершается сразу, уведомления отправляются в background
    }
}

// ❌ ПЛОХО - неправильное использование launch
class DataService {
    suspend fun getUserData(id: String): UserData {
        var userData: UserData? = null
        
        launch { // launch НЕ блокирует выполнение!
            userData = fetchUserData(id)
        }
        
        return userData!! // ОШИБКА! userData всегда null
    }
}

// ✅ REAL-WORLD: Order processing
class OrderProcessor {
    suspend fun processOrder(order: Order): ProcessedOrder {
        // Основная логика обработки заказа
        val processedOrder = validateAndProcessOrder(order)
        
        // Независимые side effects - не ждём завершения
        launch { sendOrderConfirmation(order.customerEmail) }
        launch { updateInventory(order.items) }
        launch { logOrderProcessing(order.id) }
        launch { updateAnalytics(order) }
        
        // Возвращаем результат сразу
        return processedOrder
    }
}

async - Параллельные операции с результатом

// ✅ ХОРОШО - async для параллельных операций
class UserProfileService {
    suspend fun loadCompleteProfile(userId: String): CompleteProfile = coroutineScope {
        // Запускаем операции параллельно
        val profileAsync = async { userService.getProfile(userId) }
        val ordersAsync = async { orderService.getUserOrders(userId) }
        val preferencesAsync = async { preferencesService.getUserPreferences(userId) }
        
        // Ждём завершения всех операций (максимум времени самой долгой)
        CompleteProfile(
            profile = profileAsync.await(),
            orders = ordersAsync.await(),
            preferences = preferencesAsync.await()
        )
    }
}

// ⏱️ TIMING DIAGRAM - async vs sequential:
//
// Sequential:
// ├─ fetchUser()      ▓▓▓ 500ms
// ├─ fetchOrders()        ▓▓▓ 500ms
// ├─ fetchPrefs()             ▓▓▓ 500ms
// Total: 1500ms ❌
//
// Parallel with async:
// ├─ fetchUser()      ▓▓▓
// ├─ fetchOrders()    ▓▓▓  (concurrent)
// ├─ fetchPrefs()     ▓▓▓
// Total: 500ms ✓ (3x faster!)

Таблица сравнения: launch vs async

Аспект launch async
Возвращаемое значение Job Deferred
Получение результата Не предусмотрено .await()
Назначение Side effects Параллельные вычисления
Исключения Всплывают к родителю Заморожены до await()
Блокировка вызывающей Не блокирует Блокирует на await()
Use case Логирование, уведомления API вызовы, вычисления

🔄 Корутины vs RxJava/Project Reactor

Парадигмы сравнения

IMPERATIVE vs REACTIVE
══════════════════════

Coroutines (Imperative):
    1. Код выглядит как обычный синхронный код
    2. Читаем сверху вниз (линейно)
    3. Отладка как обычно через debugger
    4. Exception handling через try-catch
    Пример: "Получить user, потом его orders"

RxJava (Functional Reactive):
    1. Код - цепочка трансформаций
    2. Читаем как pipeline (operator chains)
    3. Отладка через .doOnNext(), логи
    4. Exception handling через обработчики в цепи
    Пример: "userObservable.flatMap(orders).map(process)"
// RxJava - функциональная цепочка
class RxUserService {
    fun loadUserProfile(userId: String): Single<UserProfile> {
        return userRepository.findById(userId)
            .flatMap { user ->
                ordersRepository.findByUserId(user.id)
                    .map { orders -> UserProfile(user, orders) }
            }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .timeout(30, TimeUnit.SECONDS)
            .retry(3)
    }
}

// Корутины - императивный подход (проще!)
class CoroutineUserService {
    suspend fun loadUserProfile(userId: String): UserProfile = withContext(Dispatchers.IO) {
        return@withContext try {
            val user = userRepository.findById(userId)
            val orders = ordersRepository.findByUserId(user.id)
            UserProfile(user, orders)
        } catch (e: Exception) {
            retry(3) { // простой retry
                UserProfile(
                    userRepository.findById(userId),
                    ordersRepository.findByUserId(userId)
                )
            }
        }
    }
}

Преимущества корутин - Читаемость

// RxJava - сложно отлаживать (вложенные flatMap)
fun complexRxChain(): Observable<Result> {
    return dataSource1.loadData()
        .flatMap { data1 ->
            dataSource2.loadData(data1.id)
                .flatMap { data2 ->
                    dataSource3.processData(data1, data2)
                        .map { processedData ->
                            Result(data1, data2, processedData)
                        }
                }
        }
        .onErrorResumeNext { error ->
            handleError(error)
                .andThen(Observable.just(defaultResult))
        }
}

// Корутины - линейный код (легко понять!)
suspend fun complexCoroutineChain(): Result {
    return try {
        val data1 = dataSource1.loadData()
        val data2 = dataSource2.loadData(data1.id)
        val processedData = dataSource3.processData(data1, data2)
        Result(data1, data2, processedData)
    } catch (error: Exception) {
        handleError(error)
        defaultResult
    }
}

Таблица сравнения: Coroutines vs RxJava vs Reactor

Аспект Корутины RxJava Reactor
Парадигма Императивная Функциональная Функциональная
Читаемость ⭐⭐⭐⭐⭐ ⭐⭐⭐ ⭐⭐⭐
Отладка ⭐⭐⭐⭐⭐ ⭐⭐ ⭐⭐
Exception handling ⭐⭐⭐⭐⭐ ⭐⭐ ⭐⭐
Операторы ⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐
Backpressure Автоматическая Ручная настройка Ручная настройка
Learning curve ⭐⭐⭐⭐⭐ (низкая) ⭐⭐ (высокая) ⭐⭐ (высокая)
Performance ⭐⭐⭐⭐ ⭐⭐⭐⭐⭐ ⭐⭐⭐⭐⭐

Decision Tree: Корутины vs RxJava

Выбираешь между Coroutines и RxJava?
│
├─ Новый проект?
│  └─ YES → Используй Coroutines (проще)
│
├─ Уже используется RxJava?
│  └─ YES → Продолжай RxJava (консистентность)
│
├─ Нужна простота и readability?
│  └─ YES → Coroutines лучше
│
├─ Сложная stream обработка (debounce, combineLatest, etc)?
│  └─ YES → RxJava может быть проще
│
├─ Нужна точная backpressure стратегия?
│  └─ YES → RxJava лучше
│
└─ Команда не знакома с Rx?
   └─ YES → Coroutines (низкий порог входа)

🚀 Performance оптимизации

Batch Processing

PERFORMANCE COMPARISON
══════════════════════

❌ INEFFICIENT: One coroutine per item
10,000 items × create coroutine:
├─ Allocation: 10,000 × allocation overhead
├─ Context switches: 10,000 × switches
├─ GC pressure: High (temporary objects)
└─ Time: ~500ms

✅ EFFICIENT: Batched processing
10,000 items / 100 per batch = 100 batches:
├─ Allocation: 100 × allocation overhead
├─ Context switches: 100 × switches  
├─ GC pressure: Low
└─ Time: ~50ms (10x faster!)
// ❌ ПЛОХО - создание множества корутин (неэффективно)
class IneffientProcessor {
    suspend fun processLargeDataset(data: List<DataItem>) = supervisorScope {
        data.forEach { item ->
            launch { // создаём корутину для каждого элемента!
                processItem(item)
            }
        }
    }
}

// ✅ ХОРОШО - батчевая обработка (эффективно)
class EfficientProcessor {
    suspend fun processLargeDataset(data: List<DataItem>) = coroutineScope {
        data.chunked(100) // группы по 100 элементов
            .map { batch ->
                async {
                    batch.forEach { item ->
                        processItem(item) // обрабатываем в одной корутине
                    }
                }
            }
            .awaitAll()
    }
}

// ✅ PARALLEL BATCHES с лимитом
class OptimizedProcessor {
    suspend fun processLargeDataset(data: List<DataItem>) {
        data.chunked(100)
            .asFlow()
            .flatMapMerge(concurrency = 4) { batch ->
                flow {
                    batch.forEach { processItem(it) }
                }
            }
            .collect()
    }
}

Dispatcher оптимизация

// ❌ ПЛОХО - неэффективное использование (много context switches)
class BadPerformanceService {
    suspend fun processData(items: List<Item>): List<Result> {
        return items.map { item ->
            // Context switch для каждого элемента!
            withContext(Dispatchers.Default) {
                heavyComputation(item)
            }
        }
    }
}

// ✅ ХОРОШО - один context switch для всей работы
class GoodPerformanceService {
    suspend fun processData(items: List<Item>): List<Result> = 
        withContext(Dispatchers.Default) {
            // Один context switch - вся работа в Default потоке
            items.map { item ->
                heavyComputation(item)
            }
        }
}

// ✅ REAL-WORLD: Limited parallelism для legacy code
class LegacyIntegration {
    private val limitedIO = Dispatchers.IO.limitedParallelism(4)
    
    suspend fun processWithLegacy(items: List<Item>): List<Result> =
        withContext(limitedIO) {
            items.map { item ->
                legacyBlockingCall(item) // максимум 4 одновременных вызовов
            }
        }
}

📋 Production Readiness Checklist


🎓 Ключевые выводы

  1. GlobalScope = антипаттерн в production коде
  2. CoroutineScope для полного контроля жизненного цикла
  3. Правильный Dispatcher = основа performance
  4. launch для side effects, async для результатов
  5. Батчевая обработка = оптимизация для больших наборов
  6. Coroutines проще чем RxJava для большинства случаев
  7. Context switching = главный overhead, минимизируйте его
  8. Блокирующие операции = враги корутин
  9. Structured concurrency = гарантированное управление ресурсами
  10. Testing важен - concurrency баги сложно отловить!

Тестирование и оптимизация

🧪 Тестирование suspend функций

Базовое тестирование с runBlocking

class UserServiceTest {
    
    @Test
    fun testUserLoading() = runBlocking {
        // runBlocking блокирует текущий поток до завершения
        val user = userService.loadUser("123")
        
        assertEquals("John", user.name)
        assertTrue(user.isActive)
    }
    
    @Test
    fun testErrorHandling() = runBlocking {
        assertThrows<UserNotFoundException> {
            userService.loadUser("invalid")
        }
    }
}

Тестирование с TestDispatchers (рекомендуется)

class UserRepositoryTest {
    
    private val testDispatcher = StandardTestDispatcher()
    
    @Before
    fun setUp() {
        Dispatchers.setMain(testDispatcher)
    }
    
    @After
    fun tearDown() {
        Dispatchers.resetMain()
    }
    
    @Test
    fun testDataFetching() = runTest(testDispatcher) {
        // runTest управляет временем - нет реальной задержки
        val repository = UserRepository()
        
        val user = repository.getUser("123")
        
        assertEquals("John", user.name)
    }
}

Тестирование Flow

class UserFlowRepositoryTest {
    
    @Test
    fun testFlowTransformation() = runTest {
        val flow = userIds.asFlow()
            .map { id -> userService.getUser(id) }
            .filter { user -> user.isActive }
        
        val results = flow.toList() // собрать все элементы
        
        assertEquals(3, results.size)
        assertTrue(results.all { it.isActive })
    }
    
    @Test
    fun testFlowError() = runTest {
        val flow = flow<String> {
            emit("value1")
            throw Exception("Oops!")
        }
        
        assertThrows<Exception> {
            flow.toList()
        }
    }
    
    @Test
    fun testFlowWithCollect() = runTest {
        val values = mutableListOf<Int>()
        
        (1..3).asFlow()
            .collect { value ->
                values.add(value)
            }
        
        assertEquals(listOf(1, 2, 3), values)
    }
}

Тестирование с Mocks

class OrderServiceTest {
    
    @get:Rule
    val instantExecutorRule = InstantTaskExecutorRule()
    
    private val mockRepository: OrderRepository = mock()
    private lateinit var service: OrderService
    
    @Before
    fun setUp() {
        service = OrderService(mockRepository)
    }
    
    @Test
    fun testOrderProcessing() = runTest {
        // Mock setup
        val mockOrder = Order(id = "123", amount = 100.0)
        coEvery { mockRepository.findById("123") } returns mockOrder
        
        // Execute
        val result = service.processOrder("123")
        
        // Verify
        assertEquals("123", result.orderId)
        coVerify { mockRepository.findById("123") }
    }
}

🚀 Production patterns

Паттерн 1: Graceful Degradation

class ResilientDataService {
    
    suspend fun loadDashboardData(userId: String): DashboardData = supervisorScope {
        // Критичные данные
        val userAsync = async {
            try { userService.getUser(userId) }
            catch (e: Exception) { 
                log.error("User loading failed", e)
                null 
            }
        }
        
        // Опциональные данные
        val recommendationsAsync = async {
            try { recommendationService.get(userId) }
            catch (e: Exception) { 
                log.warn("Recommendations failed", e)
                emptyList<Recommendation>() 
            }
        }
        
        val analyticsAsync = async {
            try { analyticsService.track(userId) }
            catch (e: Exception) { 
                log.warn("Analytics failed", e)
            }
        }
        
        DashboardData(
            user = userAsync.await(),
            recommendations = recommendationsAsync.await(),
            analytics = analyticsAsync.await()
        )
    }
}

Паттерн 2: Retry с exponential backoff

suspend inline fun <T> withRetry(
    times: Int = 3,
    initialDelay: Long = 100,
    maxDelay: Long = 10000,
    factor: Double = 2.0,
    crossinline block: suspend () -> T
): T {
    var currentDelay = initialDelay
    var lastException: Exception? = null
    
    repeat(times - 1) { attempt ->
        try {
            return block()
        } catch (e: CancellationException) {
            throw e // никогда не ретраим отмену
        } catch (e: Exception) {
            lastException = e
            log.warn("Attempt ${attempt + 1} failed, retrying in ${currentDelay}ms", e)
            delay(currentDelay)
            currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
        }
    }
    
    return try {
        block() // последняя попытка
    } catch (e: Exception) {
        throw lastException ?: e
    }
}

// Использование
suspend fun reliableApiCall(): ApiResponse = withRetry(times = 3) {
    apiClient.fetchData()
}

Паттерн 3: Circuit Breaker

class CircuitBreakerService {
    
    private var failureCount = 0
    private var lastFailureTime = 0L
    private val circuitBreakerTimeout = 30_000L
    
    suspend fun <T> executeWithCircuitBreaker(
        operationName: String,
        block: suspend () -> T
    ): T {
        if (isCircuitOpen()) {
            throw CircuitBreakerOpenException(
                "$operationName is temporarily unavailable"
            )
        }
        
        return try {
            val result = withTimeout(5000) {
                block()
            }
            resetFailureCount()
            result
        } catch (e: TimeoutCancellationException) {
            recordFailure()
            throw ServiceTimeoutException("$operationName timed out", e)
        } catch (e: Exception) {
            recordFailure()
            throw e
        }
    }
    
    private fun isCircuitOpen(): Boolean {
        return failureCount >= 5 &&
               (System.currentTimeMillis() - lastFailureTime) < circuitBreakerTimeout
    }
    
    private fun recordFailure() {
        failureCount++
        lastFailureTime = System.currentTimeMillis()
    }
    
    private fun resetFailureCount() {
        failureCount = 0
    }
}

📊 Performance optimization

Избегайте блокирующих операций

// ❌ ПЛОХО - блокирующая операция в корутине
suspend fun badDatabaseAccess() {
    val connection = DriverManager.getConnection(jdbcUrl) // БЛОКИРУЕТ!
    val resultSet = connection.createStatement()
        .executeQuery("SELECT * FROM users")
}

// ✅ ХОРОШО - неблокирующая операция
suspend fun goodDatabaseAccess() = withContext(Dispatchers.IO) {
    userRepository.findAll() // R2DBC - nonblocking
}

// ✅ ХОРОШО - изоляция блокирующего кода
suspend fun legacyIntegration() = withContext(Dispatchers.IO.limitedParallelism(4)) {
    legacyJdbcClient.query() // ограничиваем количество одновременных блокирующих операций
}

Оптимальный выбор Dispatcher

I/O операции:        withContext(Dispatchers.IO)
CPU операции:        withContext(Dispatchers.Default)
UI операции (Android/Desktop): withContext(Dispatchers.Main)
Legacy code:         withContext(Dispatchers.IO.limitedParallelism(n))

Правило:
────────
Используйте Dispatchers.Default для CPU-bound операций
Используйте Dispatchers.IO для I/O operations
НЕ используйте Dispatchers.Unconfined в production (опасно!)

Batch processing для performance

suspend fun processItemsEfficiently(items: List<Item>) = coroutineScope {
    // Обработка батчами вместо одного за раз
    val batchSize = 100
    
    items.chunked(batchSize)
        .map { batch ->
            async {
                batch.map { item ->
                    processItem(item)
                }
            }
        }
        .flatMap { it.await() }
}

// Параллельная обработка с лимитом
suspend fun processItemsWithLimit(items: List<Item>, concurrency: Int = 10) {
    items.asFlow()
        .flatMapMerge(concurrency = concurrency) { item ->
            flow { emit(processItem(item)) }
        }
        .collect { /* processed item */ }
}

🛡️ Common pitfalls и как их избежать

Питфол 1: Забытая отмена

// ❌ ПЛОХО
class BadService {
    private val scope = CoroutineScope(Dispatchers.IO)
    
    fun load() {
        scope.launch { /* work */ }
    }
    // Забыл отменить scope при очистке!
}

// ✅ ХОРОШО
class GoodService : CoroutineScope {
    override val coroutineContext = Dispatchers.IO + SupervisorJob()
    
    fun load() = launch { /* work */ }
    
    fun cleanup() {
        coroutineContext[Job]?.cancel()
    }
}

Питфол 2: launch вместо async

// ❌ ПЛОХО - ждёшь результата неправильно
suspend fun wrongUsage(): String {
    var result: String? = null
    launch { result = expensiveComputation() }
    return result ?: "default" // ОШИБКА - result всегда null
}

// ✅ ХОРОШО - используй async + await
suspend fun correctUsage(): String = coroutineScope {
    val deferred = async { expensiveComputation() }
    return@coroutineScope deferred.await()
}

Питфол 3: Неправильное обращение с CancellationException

// ❌ ПЛОХО
suspend fun badCancellation() {
    try {
        longRunningOperation()
    } catch (e: Exception) {
        log.error("Operation failed", e) // логируем отмену как ошибку!
    }
}

// ✅ ХОРОШО
suspend fun goodCancellation() {
    try {
        longRunningOperation()
    } catch (e: CancellationException) {
        cleanup()
        throw e // ВСЕГДА перебрасываем
    } catch (e: Exception) {
        log.error("Operation failed", e)
        throw e
    }
}

Питфол 4: GlobalScope

// ❌ ПЛОХО - утечка памяти
class Service {
    fun startBackground() {
        GlobalScope.launch { // объект может быть destroyed, но корутина продолжит работу
            while (true) {
                doWork()
                delay(1000)
            }
        }
    }
}

// ✅ ХОРОШО
class Service : CoroutineScope {
    override val coroutineContext = Dispatchers.IO + SupervisorJob()
    
    fun startBackground() = launch {
        while (isActive) {
            doWork()
            delay(1000)
        }
    }
    
    fun stop() = coroutineContext.cancel()
}

📋 Чеклист production readiness