🏗️ Архитектура корутин
┌─────────────────────────────────────────────────────────┐
│ KOTLIN COROUTINES ARCHITECTURE LAYERS │
├─────────────────────────────────────────────────────────┤
│ │
│ FOUNDATION LAYER (Основы) │
│ ├── suspend функции (приостановимость) │
│ ├── Continuation (internal компилятор) │
│ └── State machine (компилятор генерирует) │
│ │
│ DISPATCHERS LAYER (Где выполняется) │
│ ├── Main ← UI поток (1 поток) │
│ ├── IO ← I/O операции (64+ потоков) │
│ ├── Default ← CPU задачи (CPU cores потоков) │
│ └── Unconfined ← текущий поток (специальные случаи) │
│ │
│ BUILDERS LAYER (Запуск корутин) │
│ ├── launch ← Fire-and-forget (Job) │
│ ├── async ← Параллельно с результатом (Deferred<T>) │
│ └── runBlocking ← Синхронный мост (только для тестов) │
│ │
│ SCOPE LAYER (Иерархия) │
│ ├── CoroutineScope ← обычная иерархия │
│ ├── coroutineScope{} ← временная область │
│ └── supervisorScope{} ← изолированные дети │
│ │
│ CONTEXT LAYER (Конфигурация) │
│ ├── Dispatcher (где) │
│ ├── Job (иерархия) │
│ ├── CoroutineName (отладка) │
│ ├── CoroutineExceptionHandler (ошибки) │
│ └── Custom elements (для приложений) │
│ │
│ FLOW LAYER (Асинхронные потоки) │
│ ├── Flow<T> ← холодные потоки (pull model) │
│ ├── SharedFlow<T> ← горячие события (push model) │
│ ├── StateFlow<T> ← состояние с кешем │
│ └── Channel<T> ← синхронизация между корутинами │
│ │
│ CANCELLATION LAYER (Отмена) │
│ ├── cancel() ← инициация отмены │
│ ├── ensureActive() ← проверка статуса │
│ └── CancellationException ← сигнал отмены │
│ │
│ INTEGRATION LAYER (Фреймворки) │
│ ├── Spring WebFlux ← REST endpoints │
│ ├── Spring Data R2DBC ← БД доступ │
│ ├── Room (Android) ← SQLite │
│ └── Ktor ← HTTP client/server │
│ │
└─────────────────────────────────────────────────────────┘
📊 Выбор правильного инструмента
┌─────────────────────────────────────────────────────────────┐
│ DECISION TREE - ВЫБОР ИНСТРУМЕНТА │
└─────────────────────────────────────────────────────────────┘
1. Нужен ли результат выполнения?
│
├─ ✅ ДА:
│ └─ Используй async
│ ├─ Параллельное выполнение? → async + awaitAll()
│ └─ Последовательно? → async + await()
│
└─ ❌ НЕТ:
└─ Используй launch
├─ Side effects (логирование, отправка)
└─ Fire-and-forget операции
2. Работаешь с потоком данных?
│
├─ ✅ ДА - холодный поток:
│ └─ Используй Flow<T>
│ ├─ API вызовы → flow { emit() }
│ ├─ БД запросы → repository.dataFlow()
│ └─ Файловые операции → lines.asFlow()
│
└─ ✅ ДА - горячий поток:
├─ События → SharedFlow<Event>
└─ Состояние → StateFlow<State>
3. Нужна синхронизация между корутинами?
│
├─ ✅ ДА - один-к-одному:
│ └─ Channel<T>
│ ├─ Producer → send()
│ └─ Consumer → receive()
│
└─ ✅ ДА - один-ко-многим:
└─ SharedFlow или Channel (broadcast)
4. Нужен ли timeout?
│
├─ ✅ ДА:
│ ├─ withTimeout() → бросает TimeoutCancellationException
│ └─ withTimeoutOrNull() → возвращает null при timeout
│
└─ ❌ НЕТ: обычный код
5. Какая стратегия ошибок?
│
├─ ✅ "ВСЕ или НИЧЕГО" (критичные операции):
│ └─ coroutineScope { }
│ └─ Ошибка одного = отмена всех
│
└─ ✅ "ЧАСТИЧНЫЙ УСПЕХ" (независимые операции):
└─ supervisorScope { }
└─ Ошибка одного ≠ отмена других
6. Какой контекст?
│
├─ I/O операции (БД, сеть, файлы):
│ └─ Dispatchers.IO
│
├─ CPU-интенсивные:
│ └─ Dispatchers.Default
│
├─ UI операции (Android/Desktop):
│ └─ Dispatchers.Main
│
└─ Спец. случаи (тесты):
└─ Dispatchers.Unconfined
🔄 Сравнение Launch vs Async
TIMING DIAGRAM
==============
Без параллелизма (последовательно):
Operation 1: ▓▓▓▓▓▓▓▓ 1000ms
Operation 2: ▓▓▓▓▓▓▓▓ 1000ms
Operation 3: ▓▓▓▓▓▓▓▓ 1000ms
─────────────────────────────────────────────
Итого: 3000ms
С параллелизмом (async):
Operation 1: ▓▓▓▓▓▓▓▓ 1000ms ┐
Operation 2: ▓▓▓▓▓▓▓▓ 1000ms ├─ параллельно
Operation 3: ▓▓▓▓▓▓▓▓ 1000ms ┘
─────────────────────────────────────────────
Итого: ▓▓▓▓▓▓▓▓ 1000ms (максимум)
ОСОБЕННОСТЬ LAUNCH:
launch { } ← НЕ ждёт завершения
│
└─ Возвращает сразу (Job)
│
└─ Код выполняется в background
ОСОБЕННОСТЬ ASYNC:
async { } ← Ждёт await()
│
├─ Возвращает Deferred<T>
│
└─ Требует вызвать await()
│
└─ Блокирует вызывающую корутину
| Аспект | launch | async |
|---|---|---|
| Возвращаемое значение | Job | Deferred |
| Получение результата | Не предусмотрено | .await() |
| Назначение | Side effects | Параллельные вычисления |
| Исключения | Всплывают к родителю | Заморожены до await() |
| Блокировка | Не блокирует вызывающую | Блокирует на await() |
Практические примеры
// ❌ ПЛОХО - неправильное использование launch
suspend fun wrongUsage(): UserData {
var userData: UserData? = null
launch { // launch НЕ блокирует выполнение!
userData = fetchUserData()
}
return userData!! // ОШИБКА! userData всегда null
}
// ✅ ХОРОШО - launch для side effects
class NotificationService {
suspend fun processOrder(order: Order) {
// Основная логика
val result = validateAndProcessOrder(order)
// Независимые side effects - запускаются параллельно
launch { sendOrderConfirmation(order.email) }
launch { updateInventory(order.items) }
launch { logOrderEvent(order.id) }
launch { updateAnalytics(order) }
return result
}
}
// ✅ ХОРОШО - async для параллельных операций с результатом
suspend fun loadUserDashboard(userId: String): Dashboard = coroutineScope {
// Запускаем все запросы параллельно
val profileAsync = async { profileService.getProfile(userId) }
val ordersAsync = async { orderService.getOrders(userId) }
val preferencesAsync = async { prefsService.getPreferences(userId) }
// Ждём результатов (максимум времени самого долгого запроса)
Dashboard(
profile = profileAsync.await(),
orders = ordersAsync.await(),
prefs = preferencesAsync.await()
)
}
⏱️ Timeline диаграмма Structured Concurrency
PARENT COROUTINE LIFECYCLE
==========================
Parent launch/async
│
├─── Child 1 launch
│ └─ может упасть ❌
│
├─── Child 2 async
│ └─ suspended в await()
│
└─── Child 3 async
└─ в процессе выполнения
СЦЕНАРИЙ 1: Обычный Job (все связаны)
───────────────────────────────────────
Child 1 throws Exception
│
├─ отменяет Child 2
├─ отменяет Child 3
└─ отменяет Parent
Результат: ВСЕ завершены с ошибкой ❌
СЦЕНАРИЙ 2: SupervisorJob (изолированы)
─────────────────────────────────────────
Child 1 throws Exception
│
├─ Child 1 logs error ❌
├─ Child 2 continues ✓
├─ Child 3 continues ✓
└─ Parent continues ✓
Результат: Child 1 упал, остальные работают
🚦 Flow vs Reactive Streams
EXECUTION MODEL COMPARISON
===========================
Flow (Cold Stream):
User₁ subscribes User₂ subscribes
│ │
├─► Producer 1 ├─► Producer 2
│ (fresh) │ (fresh)
└─► Emit 1,2,3 └─► Emit 1,2,3
Каждый подписчик получает независимый поток
Rx Observable (Hot/Cold):
Producer (once)
│
├─► Observer₁ (может пропустить данные)
├─► Observer₂ (может пропустить данные)
└─► Observer₃ (может пропустить данные)
Один producerдля всех
StateFlow (Hot State):
[Current State: 42]
│
├─► Subscriber₁ (получает 42 сразу)
├─► Subscriber₂ (получает 42 сразу)
└─► Subscriber₃ (получает 42 сразу)
Новые подписчики получают последнее состояние
🔗 Dispatcher Selection Guide
REQUEST JOURNEY
================
┌─────────────────────────────────┐
│ Incoming Request │
└──────────────┬──────────────────┘
│
┌──────▼──────┐
│ withContext │
└──────┬──────┘
│
┌─────────┴──────────┬──────────────┐
│ │ │
│ │ │
IO Default/CPU Main/UI
─────────────────────────────────────────────
• БД запросы • Сортировка • Обновление UI
• HTTP calls • Парсинг JSON • Прогресс бары
• Файлы • Хеширование • Анимации
• Сокеты • Шифрование • Toast/Dialogs
Pool: 64+ Pool: CPU cores Pool: 1 (single)
threads threads thread
📝 Основы корутин
Suspend функции
// Suspend функция может быть приостановлена
suspend fun fetchUserData(id: String): UserData {
val profile = fetchProfile(id) // suspension point
val settings = fetchSettings(id) // suspension point
return UserData(profile, settings)
}
// Обычная функция НЕ может вызывать suspend напрямую
fun normalFunction() {
// fetchUserData("123") // ОШИБКА компиляции!
}
// Suspend функции можно вызывать только из:
// 1. Других suspend функций
// 2. Корутинных bilders (launch, async)
// 3. runBlocking (только для тестов/main)
Корутинные билдеры
// launch - Fire and Forget
val job: Job = scope.launch(Dispatchers.IO) {
doSomething() // не возвращает результат
}
job.cancel()
// async - Параллельное выполнение с результатом
val deferred: Deferred<String> = scope.async(Dispatchers.IO) {
computeSomething() // возвращает значение
}
val result = deferred.await()
// runBlocking - Синхронный мост (ТОЛЬКО для тестов!)
fun main() = runBlocking {
val result = suspendingFunction()
println(result)
}
CoroutineScope - область жизни
// GlobalScope - живёт вечно (ОПАСНО! утечки памяти)
GlobalScope.launch { /* может пережить ваш объект */ }
// Кастомный scope - контролируемый жизненный цикл
class UserService : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.IO + job
fun loadData() = launch {
val data = fetchUserData()
updateCache(data)
}
fun cleanup() {
job.cancel() // отменяет все корутины
}
}
// viewModelScope (Android) - автоматическая отмена
class UserViewModel : ViewModel() {
fun loadData() {
viewModelScope.launch { // автоматически отменится в onCleared()
val data = loadFromNetwork()
_uiState.value = UiState.Success(data)
}
}
}
🔄 Flow - асинхронные потоки
// Создание Flow
fun numbersFlow(): Flow<Int> = flow {
for (i in 1..5) {
delay(1000)
emit(i)
}
}
// Трансформация данных
fun processNumbers() = numbersFlow()
.filter { it % 2 == 0 }
.map { "Number: $it" }
.flowOn(Dispatchers.Default)
.onEach { log.info(it) }
.catch { exception -> emit("Error: ${exception.message}") }
// Потребление Flow
suspend fun consumeFlow() {
processNumbers().collect { value ->
println(value)
}
}
// StateFlow для управления состоянием
class UserViewModel {
private val _state = MutableStateFlow<UiState>(UiState.Loading)
val state: StateFlow<UiState> = _state.asStateFlow()
fun loadUser(id: String) {
viewModelScope.launch {
try {
_state.value = UiState.Loading
val user = userService.getUser(id)
_state.value = UiState.Success(user)
} catch (e: Exception) {
_state.value = UiState.Error(e.message)
}
}
}
}
🛡️ Отмена и обработка ошибок
// Отмена корутины
val job = launch { /* code */ }
job.cancel() // отменить
job.cancelAndJoin() // отменить и дождаться
// Проверка статуса отмены
suspend fun longRunningTask() {
repeat(1000) { i ->
ensureActive() // throws CancellationException если отменена
if (!isActive) { // мягкая проверка
cleanup()
return
}
doWork(i)
delay(10)
}
}
// Обработка исключений
val exceptionHandler = CoroutineExceptionHandler { _, exception ->
log.error("Coroutine failed", exception)
}
val scope = CoroutineScope(Dispatchers.IO + SupervisorJob() + exceptionHandler)
// Try-catch в корутинах
suspend fun safeOperation() {
try {
riskyOperation()
} catch (e: CancellationException) {
cleanup()
throw e // ВСЕГДА перебрасываем
} catch (e: Exception) {
handleError(e)
}
}
📌 Ключевые моменты
- Suspend функции — не блокируют потоки, приостанавливают выполнение
- launch — fire-and-forget для side effects
- async — параллельное выполнение с результатом через await()
- Dispatchers выбираются по типу операции (IO/Default/Main)
- Structured Concurrency автоматизирует управление жизненным циклом
- Flow — холодные потоки для трансформации данных
- StateFlow — горячее состояние с кешем последнего значения
- CancellationException всегда перебрасывать, не логировать как ошибку
- GlobalScope — антипаттерн, вызывает утечки памяти
- SupervisorJob — для изоляции ошибок независимых корутин
Основы корутин
Что такое корутина?
Корутина (Coroutine) — это легковесная единица асинхронного выполнения, которая может быть приостановлена (suspended) и возобновлена позже без блокировки потока. Корутины позволяют писать асинхронный код в последовательном стиле.
Ключевые концепции:
- Suspension — возможность приостановить выполнение функции
- Non-blocking — не блокируют поток во время ожидания
- Structured concurrency — корутины организованы в иерархию с автоматической отменой
- Cooperative multitasking — корутины добровольно отдают управление
// Простая корутина
suspend fun fetchUser(id: String): User {
delay(1000) // приостанавливает корутину, НЕ блокирует поток
return userRepository.findById(id)
}
📊 Отличия корутин от потоков
Потокоёмкость: Threads vs Coroutines
THREAD POOL MODEL (Traditional Java)
====================================
Thread Pool (10 потоков)
│
├─ Thread 1 [BlockedI/O: fetchDB()]
│ └─ Stack: ~1-2 MB
│
├─ Thread 2 [BlockedI/O: callAPI()]
│ └─ Stack: ~1-2 MB
│
├─ Thread 3 [BlockedI/O: readFile()]
│ └─ Stack: ~1-2 MB
│
├─ Thread 4-8 [Blocked/Idle...]
│ └─ Wasting ~8-10 MB!
│
└─ Thread 9-10 [Available]
Resources WASTED on blocking!
Can handle ~1000 concurrent requests
COROUTINE MODEL (Kotlin)
=========================
Dispatcher (2 active threads)
│
├─ Thread 1 ─────────────────────────────
│ ├─ Coroutine 1 [I/O] ──┐
│ ├─ Coroutine 2 [CPU] ──┤ All lightweight!
│ ├─ Coroutine 3 [I/O] ──┤ ~100 bytes each
│ └─ ... 10,000 more ──┘
│
└─ Thread 2 ─────────────────────────────
├─ Coroutine 10001 [I/O]
├─ Coroutine 10002 [CPU]
└─ ...
Resources EFFICIENT!
Can handle ~1 MILLION concurrent requests
Таблица сравнения: Thread vs Coroutine
| Аспект | Thread | Coroutine |
|---|---|---|
| Вес | ~1-8 MB стека | ~100 байт объекта |
| Создание | Дорогое (OS scheduling) | Дешёвое (~microseconds) |
| Переключение | Context switch (~microseconds) | Virtual (~nanoseconds) |
| Масштабируемость | ~1000 потоков max | ~1 000 000 корутин |
| Блокировка | Блокирует весь поток | Приостанавливает корутину |
| Memory footprint | 10,000 threads = 10-80GB | 10,000 корутин = ~1MB |
| Отмена | Unsafe (Thread.stop deprecated) | Safe (cooperative) |
| Синхронизация | Mutex, Semaphore, Lock | Suspend, Channel |
🔄 Практический пример: 1000 HTTP запросов
❌ Плохо: Thread Pool подход
// Используем Executor с 100 потоками
private val executor = Executors.newFixedThreadPool(100)
fun fetchUsersThreadApproach(userIds: List<String>) {
val futures = userIds.map { userId ->
executor.submit {
val user = blockingHttpCall("/users/$userId") // БЛОКИРУЕТ ПОТОК!
println("Loaded: $user")
}
}
futures.forEach { it.get() } // ждём всех
}
// Проблемы:
// 1. Нужно 100 потоков для 1000 запросов
// 2. Если поток блокируется, деградация производительности
// 3. Переключение контекста = дорого
// 4. ~800 MB памяти вплиялась!
✅ Хорошо: Coroutine подход
suspend fun fetchUsersCoroutineApproach(userIds: List<String>) = withContext(Dispatchers.IO) {
userIds.map { userId ->
async { // только запуск, не блокирует!
val user = apiClient.getUser(userId) // SUSPEND, не блокирует!
println("Loaded: $user")
user
}
}.awaitAll() // ждём всех результатов
}
// Преимущества:
// 1. Нужно 2 потока для 1000 запросов!
// 2. Приостановка = не теряется CPU
// 3. Переключение контекста между корутинами = дешёво
// 4. ~1 MB памяти!
📈 Batch обработка: когда нужны корутины
10 пользователей, 1 запрос на пользователя (100ms каждый)
SEQUENTIAL (обычный подход):
User 1 ▓▓▓▓▓▓▓▓ 100ms
User 2 ▓▓▓▓▓▓▓▓ 100ms
User 3 ▓▓▓▓▓▓▓▓ 100ms
...
User 10 ▓▓▓▓▓▓▓▓ 100ms
─────────────────────────────────────────────────── 1000ms TOTAL!
THREAD POOL (10 потоков):
User 1 ▓▓▓▓▓▓▓▓
User 2 ▓▓▓▓▓▓▓▓
User 3 ▓▓▓▓▓▓▓▓
...
User 10 ▓▓▓▓▓▓▓▓
─────────────────── ~100ms TOTAL (но использует 10 потоков!)
COROUTINES (2 потока):
User 1,2,3... (все параллельно)
Dispatchers.IO выполняет async/await
─────────────────── ~100ms TOTAL (использует 2 потока!)
🎯 Structured Concurrency - ключевая концепция
Problem without Structured Concurrency:
// ❌ ПЛОХО: запущенные корутины могут "утечь"
class OldWayService {
fun loadDataInBackground(userId: String) {
GlobalScope.launch { // ← НИКОГДА НЕ ОТМЕНЯЕТСЯ
val user = fetchUser(userId)
updateCache(user)
}
}
// Service удаляется, но корутина продолжает работать!
}
Solution with Structured Concurrency:
// ✅ ХОРОШО: корутины контролируются через scope
class GoodWayService : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.IO + job
fun loadDataInBackground(userId: String) {
launch { // привязано к scope
val user = fetchUser(userId)
updateCache(user)
}
}
fun cleanup() {
job.cancel() // отменяет ВСЕ корутины
}
}
// Service удаляется → coroutineContext отменяется → все корутины останавливаются
Диаграмма Structured Concurrency:
Parent Scope (User service)
│
├─ Coroutine 1: fetchUser ✓
├─ Coroutine 2: updateCache ✓
└─ Coroutine 3: syncDB ✓
Service destroyed
│
└─ ALL coroutines automatically cancelled ✓
🏗️ Executor vs CoroutineScope: Comparison
// EXECUTOR APPROACH (old way)
class OldExecutorService {
private val executor = Executors.newFixedThreadPool(10)
fun processItem(item: Item) {
executor.submit {
try {
val result = heavyComputation(item)
handleResult(result)
} catch (e: Exception) {
log.error("Error", e)
}
}
}
// No graceful way to cancel all pending tasks
fun shutdown() {
executor.shutdown() // crude shutdown
}
}
// COROUTINE APPROACH (modern way)
class ModernCoroutineService : CoroutineScope {
override val coroutineContext = Dispatchers.Default + SupervisorJob()
fun processItem(item: Item) {
launch {
try {
val result = heavyComputation(item)
handleResult(result)
} catch (e: Exception) {
log.error("Error", e)
}
}
}
// Graceful cancellation with cleanup
suspend fun shutdown() {
coroutineContext.job?.cancelAndJoin()
}
}
⚠️ Common Pitfall: ThreadLocal + Coroutines
// ThreadLocal работает ПО ПОТОКАМ, а корутины выполняются в разных потоках!
val requestId = ThreadLocal<String>()
suspend fun badApproach() {
requestId.set("req-123") // запустили в Main потоке
withContext(Dispatchers.IO) { // ПЕРЕКЛЮЧАЕМСЯ В ДРУГОЙ ПОТОК!
println(requestId.get()) // NULL! Данные потеряны!
}
}
// ✅ РЕШЕНИЕ: Используйте CoroutineContext элементы вместо ThreadLocal
class RequestId(val id: String) : AbstractCoroutineContextElement(RequestId) {
companion object Key : CoroutineContext.Key<RequestId>
}
suspend fun goodApproach() {
val requestId = RequestId("req-123")
withContext(Dispatchers.IO + requestId) { // передаём через контекст
val id = coroutineContext[RequestId]?.id // "req-123" ✓
println(id)
}
}
📋 Чеклист: Готовность к корутинам
🎓 Ключевые выводы
- Корутины ~ 1,000x легче потоков - можно создавать миллионы
- Structured Concurrency - автоматическое управление жизненным циклом
- Non-blocking - приостановка без потери CPU
- Cooperative - корутины сами решают когда остановиться
- Context-aware - используют CoroutineContext вместо ThreadLocal
- Composable - можно комбинировать с Flow и другими структурами
Ключевые строительные блоки
Suspend функции
Suspend функция — это функция, которая может быть приостановлена и возобновлена позже без блокировки потока. Ключевое слово suspend указывает компилятору, что функция может приостанавливать выполнение в suspension points.
Базовые принципы
// suspend функция может вызывать другие suspend функции
suspend fun fetchUserData(id: String): UserData {
val profile = fetchProfile(id) // suspension point
val settings = fetchSettings(id) // suspension point
return UserData(profile, settings)
}
// Обычная функция НЕ может вызывать suspend функции напрямую
fun normalFunction() {
// fetchUserData("123") // ОШИБКА компиляции!
}
// Suspend функции можно вызывать только из:
// 1. Других suspend функций
// 2. Корутинных builders (launch, async)
// 3. runBlocking (только для тестов/main)
Что происходит под капотом:
- Компилятор преобразует
suspendфункцию в state machine - В каждом suspension point функция может сохранить состояние и освободить поток
- При возобновлении состояние восстанавливается и выполнение продолжается
📊 Suspension Points - визуально
EXECUTION FLOW
==============
suspend fun processUser(userId: String): Result {
// STATE 0: Starting
val profile = fetchProfile(userId) ◄── SUSPENSION POINT 1
(может приостановиться)
// STATE 1: Profile loaded, resume here
val settings = fetchSettings(userId) ◄── SUSPENSION POINT 2
(может приостановиться)
// STATE 2: Settings loaded, resume here
return process(profile, settings)
}
ЧТО ВИДИТ КОМПИЛЯТОР:
═════════════════════
suspend fun processUser(...): Continuation<Result>
|
├─ switch(state) {
│ case 0 → fetchProfile() (возвращает suspend)
│ case 1 → fetchSettings() (возвращает suspend)
│ case 2 → return process()
│ }
|
└─ При suspend → сохраняем state и отпускаем поток
При resume → восстанавливаем state и продолжаем
🔄 Coroutine Builders: Запуск корутин
launch - Fire and Forget
// launch НЕ возвращает результат, нужен только для side effects
val job: Job = scope.launch(Dispatchers.IO) {
val userData = fetchUserData()
updateDatabase(userData) // side effect
sendNotification(userData) // side effect
}
// Отмена
job.cancel()
job.join() // ждём завершения
job.cancelAndJoin() // отмена + ждём
// ⏱️ TIMING DIAGRAM
┌─────────────────────────────────┐
│ launch { ... } returns Job ─────→ code continues immediately!
│ ├─ Task executing in background │
│ ├─ Main code doesn't wait │
│ └─ Fire and forget │
└─────────────────────────────────┘
Когда использовать launch:
- Side effects (логирование, кеширование, отправка событий)
- Fire-and-forget операции
- Параллельные фоновые задачи
async - Параллельное выполнение с результатом
// async возвращает Deferred<T>, нужно вызвать await()
val deferred: Deferred<String> = scope.async(Dispatchers.IO) {
computeExpensiveValue() // returns String
}
// Ждём результата
val result = deferred.await()
// ⏱️ TIMING DIAGRAM
┌──────────────────────────────────────┐
│ async { ... } returns Deferred ────→ code continues
│ ├─ Task executing in parallel │
│ ├─ Later: result = deferred.await() │
│ │ ← BLOCKS until ready │
│ └─ Get result when ready │
└──────────────────────────────────────┘
Когда использовать async:
- Нужен результат выполнения
- Параллельные операции с await()
- Комбинирование нескольких async результатов
❌ Частая ошибка: launch вместо async
// ❌ ПЛОХО: забываешь результат
suspend fun wrongApproach(): String {
var result: String? = null
launch { // launch НЕ ждёт!
result = expensiveComputation()
}
return result ?: "default" // ВСЕГДА null! launch не ждёт завершения
}
// ✅ ХОРОШО: используй async + await
suspend fun correctApproach(): String = coroutineScope {
val deferred = async { expensiveComputation() }
return@coroutineScope deferred.await()
}
// ✅ ИЛИ: используй launch + лучшую структуру
suspend fun anotherGoodApproach(): String = coroutineScope {
val result = async { expensiveComputation() }
return@coroutineScope result.await()
}
runBlocking - Синхронный мост (ТОЛЬКО для тестов!)
// runBlocking БЛОКИРУЕТ текущий поток! НИКОГДА в production!
fun main() = runBlocking { // ← БЛОКИРУЕТ основной поток
val user = fetchUserData() // suspend вызов
println(user)
}
// ⚠️ TIMING DIAGRAM - runBlocking опасен!
Main thread:
┌─────────────────────────────────┐
│ runBlocking { ... } │
│ ├─ Coroutine 1 executing │
│ ├─ THREAD BLOCKED! │ ← Весь поток замёрзнет!
│ └─ Processing... │
└─────────────────────────────────┘
0ms 1000ms 2000ms 3000ms
// ❌ НИКОГДА делайте так в production:
@RestController
class Controller {
@GetMapping("/users/{id}")
fun getUser(@PathVariable id: String): User {
return runBlocking { // ← ВЕСЬ THREAD POOL ЗАМЁРЗНЕТ!
userService.fetchUser(id)
}
}
}
📊 Dispatch Selection Guide
DISPATCHER CHARACTERISTICS
==========================
┌─────────────────────────────────────────────────────┐
│ Dispatchers.Main │
├─────────────────────────────────────────────────────┤
│ • Назначение: UI операции (Android/Desktop) │
│ • Pool size: 1 (single thread!) │
│ • Использование: updateUI(), setText() │
│ • Производительность: MUST BE FAST! │
│ • Timeout: <16ms (для 60fps) │
│ • Пример: │
│ withContext(Dispatchers.Main) { │
│ binding.userText.text = user.name │
│ } │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ Dispatchers.IO │
├─────────────────────────────────────────────────────┤
│ • Назначение: I/O операции (сеть, файлы, БД) │
│ • Pool size: 64+ потоков (expandable!) │
│ • Использование: API calls, DB queries │
│ • Производительность: OPTIMIZED FOR BLOCKING │
│ • Timeout: Few seconds OK │
│ • Пример: │
│ withContext(Dispatchers.IO) { │
│ userRepository.findById(id) │
│ } │
└─────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────┐
│ Dispatchers.Default │
├─────────────────────────────────────────────────────┤
│ • Назначение: CPU-intensive операции │
│ • Pool size: CPU cores (fixed!) │
│ • Использование: Sorting, Parsing, Encryption │
│ • Производительность: OPTIMIZED FOR CPU │
│ • Timeout: Variable (depends on algorithm) │
│ • Пример: │
│ withContext(Dispatchers.Default) { │
│ expensiveJsonParsing(jsonString) │
│ } │
└─────────────────────────────────────────────────────┘
DISPATCHER SELECTION DECISION TREE
═══════════════════════════════════
Что делаешь?
│
├─ I/O операция (БД, HTTP, файлы)?
│ └─ Dispatchers.IO
│
├─ CPU операция (сортировка, парсинг, шифрование)?
│ └─ Dispatchers.Default
│
├─ UI операция (обновление экрана)?
│ └─ Dispatchers.Main
│
└─ Уникальный случай?
└─ Dispatchers.Unconfined (very rare!)
💡 Практический пример: Параллельная загрузка
// ⏱️ TIMING DIAGRAM
Sequential (неправильно):
┌─────────────────────────────────────┐
│ fetchUser() ▓▓▓ 1000ms │
│ fetchOrders() ▓▓▓ 1000ms │
│ fetchSettings() ▓▓▓ 1000ms │
│ Total: 3000ms │
└─────────────────────────────────────┘
Parallel (правильно):
┌─────────────────┐
│ fetchUser() ▓▓▓ │ 1000ms
│ fetchOrders() ▓▓ │ 1000ms (parallel!)
│ fetchSettings() ▓ │ 1000ms (parallel!)
│ Total: 1000ms │
└─────────────────┘
ВОТ КОД:
// ❌ ПЛОХО: последовательно
suspend fun slowDashboard(userId: String): Dashboard {
val user = userService.getUser(userId) // 1000ms
val orders = orderService.getOrders(userId) // 1000ms
val prefs = prefService.getPrefs(userId) // 1000ms
return Dashboard(user, orders, prefs) // Итого: 3000ms ❌
}
// ✅ ХОРОШО: параллельно
suspend fun fastDashboard(userId: String): Dashboard = coroutineScope {
val userAsync = async { userService.getUser(userId) } // 1000ms
val ordersAsync = async { orderService.getOrders(userId) } // 1000ms
val prefsAsync = async { prefService.getPrefs(userId) } // 1000ms
// awaitAll ждёт всех результатов
return@coroutineScope Dashboard(
user = userAsync.await(),
orders = ordersAsync.await(),
prefs = prefsAsync.await()
) // Итого: 1000ms ✅
}
withContext() - Переключение контекста
// Переключаемся между dispatchers без создания новой корутины
suspend fun complexOperation() = withContext(Dispatchers.IO) {
val data = loadFromNetwork() // в Dispatchers.IO потоке
withContext(Dispatchers.Default) {
val processed = expensiveProcessing(data) // в Default потоке
}
withContext(Dispatchers.Main) {
displayResult(processed) // в Main потоке
}
}
// ⏱️ TIMING: All in SAME coroutine, different threads
Main thread ────┐
│
IO threads ──│── loadFromNetwork
Default threads ├── expensiveProcessing
Main thread ──┘
└─ displayResult
🛡️ CoroutineScope - управление жизненным циклом
// ❌ ПЛОХО: GlobalScope - утечка памяти
class UserService {
fun loadData(userId: String) {
GlobalScope.launch { // НИКОГДА не отменяется!
val data = fetchUserData(userId)
updateCache(data)
}
}
}
// ✅ ХОРОШО: управляемый CoroutineScope
class UserService : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.IO + job
fun loadData(userId: String) {
launch { // привязано к scope
val data = fetchUserData(userId)
updateCache(data)
}
}
fun cleanup() {
job.cancel() // отменяет ВСЕ корутины
}
}
// Android пример (ViewModel)
class UserViewModel : ViewModel() {
fun loadData(userId: String) {
viewModelScope.launch { // автоматически отменяется в onCleared()
val user = userService.getUser(userId)
_state.value = UiState.Success(user)
}
}
// viewModelScope.cancel() вызывается автоматически!
}
📋 Чеклист: Правильное использование builders
🎓 Ключевые выводы
- launch = fire-and-forget для side effects
- async = параллельное выполнение с результатом (await())
- Dispatcher = выбираем по типу операции (IO/Default/Main)
- runBlocking = ТОЛЬКО для тестов и main()!
- CoroutineScope = контролируем жизненный цикл
- GlobalScope = антипаттерн, вызывает утечки
- withContext() = переключение контекста без новой корутины
Structured Concurrency
📊 Иерархия корутин - диаграмма
STRUCTURED CONCURRENCY HIERARCHY
=================================
Parent Scope (root)
├─ Context: Dispatchers.IO + SupervisorJob()
│
├─ Child Job 1 ✓ Success
│ └─ завершена нормально
│
├─ Child Job 2 ✗ Exception
│ └─ упала с ошибкой
│
└─ Child Job 3 → Cancelled
└─ отменена при ошибке Job 2
ОБЫЧНЫЙ JOB (ERROR PROPAGATION)
────────────────────────────────
Parent Job
▲
│ (CancellationException)
│
┌───────────────┼───────────────┐
│ │ │
Child 1 ✓ Child 2 ✗ Child 3 ❌
Success throws error cancelled!
Результат: ОДИН УПАЛ - ВСЕ УПАЛИ!
SUPERVISORJOB (ERROR ISOLATION)
────────────────────────────────
Parent Supervisor
│
┌────────────────┼────────────────┐
│ │ │
Child 1 ✓ Child 2 ✗ Child 3 ✓
Success error isolated continues!
Результат: ОДИН УПАЛ - ОСТАЛЬНЫЕ РАБОТАЮТ!
🎯 Когда использовать coroutineScope vs supervisorScope
DECISION TREE
=============
Нужно гарантировать успех ВСЕ или НИЧЕГО?
│
├─ ✅ ДА (критичные данные, транзакция, платёж)
│ └─ coroutineScope {
│ val result1 = async { criticalOp1() } // если упадёт
│ val result2 = async { criticalOp2() } // отменяет result2
│ val result3 = async { criticalOp3() } // отменяет result3
│ }
│
└─ ❌ НЕТ (независимые операции, dashboard, сбор данных)
└─ supervisorScope {
val profile = async { loadProfile() } // если упадёт
val orders = async { loadOrders() } // продолжит работу
val prefs = async { loadPrefs() } // продолжит работу
}
ПРАКТИЧЕСКИЕ ПРИМЕРЫ
====================
Финансовая транзакция (coroutineScope - "ВСЕ или НИЧЕГО"):
────────────────────────────────────────────────────────
suspend fun processPayment(payment: Payment) = coroutineScope {
val validation = async { validatePayment(payment) }
val authorization = async { authorizeCard(payment) }
val processing = async { processTransaction(payment) }
// Если любой шаг упадёт - вся транзакция отменяется
val results = awaitAll(validation, authorization, processing)
return@coroutineScope PaymentResult(results)
}
Загрузка Dashboard (supervisorScope - "ЧАСТИЧНЫЙ УСПЕХ"):
──────────────────────────────────────────────────────────
suspend fun loadDashboard(userId: String) = supervisorScope {
val weather = async {
try { weatherService.get() }
catch (e: Exception) { null } // fallback
}
val news = async {
try { newsService.get() }
catch (e: Exception) { emptyList() }
}
val stocks = async {
try { stockService.get(userId) }
catch (e: Exception) { emptyList() }
}
// Показываем то что удалось загрузить
return@supervisorScope Dashboard(
weather = weather.await(),
news = news.await(),
stocks = stocks.await()
)
}
Микросервисный шлюз (supervisorScope - независимые сервисы):
─────────────────────────────────────────────────────────────
suspend fun fetchDashboardData(userId: String) = supervisorScope {
val user = async {
try { userService.getUser(userId) }
catch (e: Exception) { null }
}
val orders = async {
try { orderService.getOrders(userId) }
catch (e: Exception) { emptyList() }
}
val notifications = async {
try { notificationService.get(userId) }
catch (e: Exception) { emptyList() }
}
// Один упавший сервис не ломает весь dashboard
DashboardData(
user = user.await(),
orders = orders.await(),
notifications = notifications.await()
)
}
🔄 Сравнение Job типов
| Аспект | Job | SupervisorJob |
|---|---|---|
| Ошибка дочерней | Отменяет всех siblings | Изолирует от siblings |
| Родитель ждёт | Да, всех завершения | Да, всех завершения |
| Отмена родителя | Отменяет всех детей | Отменяет всех детей |
| Use case | Критичные операции | Независимые операции |
| Отмена одного child | Отменяет остальных | НЕ влияет на остальных |
💡 Комбинированный подход
// Часто используется комбинация coroutineScope + supervisorScope
class OrderProcessingService : CoroutineScope {
private val supervisorJob = SupervisorJob()
override val coroutineContext = Dispatchers.IO + supervisorJob
suspend fun processOrder(order: Order): OrderResult = supervisorScope {
// КРИТИЧНАЯ часть - strict иерархия
val coreResult = async {
coroutineScope { // ВСЕ или НИЧЕГО
val validation = async { validateOrder(order) }
val inventory = async { reserveInventory(order) }
val payment = async { processPayment(order) }
CoreOrderData(
validation.await(),
inventory.await(),
payment.await()
)
}
}
// ДОПОЛНИТЕЛЬНЫЕ сервисы - graceful degradation
val notifications = async {
try { sendNotification(order) }
catch (e: Exception) { false }
}
val analytics = async {
try { trackOrderEvent(order) }
catch (e: Exception) { false }
}
OrderResult(
core = coreResult.await(),
notified = notifications.await(),
tracked = analytics.await()
)
}
}
🛡️ Обработка ошибок в структурированной конкуренции
// Паттерн: Exception Handler + SupervisorJob + Try-Catch
val scope = CoroutineScope(
Dispatchers.IO +
SupervisorJob() +
CoroutineExceptionHandler { _, exception ->
log.error("Root exception handler", exception)
metricsService.recordException(exception)
}
)
scope.launch {
supervisorScope {
val task1 = async {
try { riskyOp1() }
catch (e: SpecificException) {
log.warn("Task 1 failed", e)
null // fallback
}
}
val task2 = async {
try { riskyOp2() }
catch (e: SpecificException) {
log.warn("Task 2 failed", e)
null // fallback
}
}
val task3 = async {
try { riskyOp3() }
catch (e: Exception) {
log.error("Task 3 critical failure", e)
throw e // propagate critical errors
}
}
try {
val results = awaitAll(task1, task2, task3)
processResults(results)
} catch (e: CancellationException) {
cleanup()
throw e // ALWAYS rethrow
}
}
}
📋 Чеклист Structured Concurrency
- Используете ли вы
GlobalScope? → ЗАМЕНИТЕ наCoroutineScope - Отменяются ли корутины при завершении scope? → Убедитесь в
- Обработаны ли исключения? → Добавьте
- Правильный Job тип? →
Jobдля "всё или ничего",SupervisorJob - Проверяется ли
isActive - Закрываются ли ресурсы? → Используйте
try-finally
🚀 Практические паттерны
Паттерн 1: Graceful Shutdown
class ServiceManager {
private val scope = CoroutineScope(
Dispatchers.IO + SupervisorJob() +
CoroutineName("ServiceManager")
)
fun startServices() {
scope.launch {
launch { backgroundCleanup() }
launch { reportingTask() }
launch { healthCheckTask() }
}
}
suspend fun shutdown() {
scope.coroutineContext[Job]?.let { job ->
log.info("Shutting down ServiceManager...")
job.cancel()
try {
withTimeout(30_000) {
job.join() // wait for graceful shutdown
}
} catch (e: TimeoutCancellationException) {
log.warn("Shutdown timeout, forcing termination")
}
}
}
}
Паттерн 2: Batch Processing
suspend fun processBatchIndependently(items: List<Item>) = supervisorScope {
val results = items.map { item ->
async {
try {
processItem(item)
} catch (e: Exception) {
log.error("Failed to process item ${item.id}", e)
ProcessingResult.Failed(item.id, e)
}
}
}.awaitAll()
val successful = results.filterIsInstance<ProcessingResult.Success>()
val failed = results.filterIsInstance<ProcessingResult.Failed>()
log.info("Batch processing: ${successful.size} success, ${failed.size} failed")
return@supervisorScope BatchResult(successful, failed)
}
Отмена и обработка исключений
Механизм отмены корутин
Отмена корутин — это кооперативный процесс, где корутина добровольно проверяет свой статус и прекращает работу. В отличие от Thread.interrupt(), отмена корутин безопасна и предсказуема.
Ключевые принципы отмены
- Кооперативность — корутина сама решает, когда остановиться
- Безопасность — отмена не прерывает операцию посередине
- Структурированность — отмена распространяется по иерархии
- Исключение как сигнал — CancellationException указывает на отмену
📊 Распространение исключений: launch vs async
LAUNCH - исключение сразу всплывает вверх
═══════════════════════════════════════════
scope.launch {
try {
child1Launch() // ← throws Exception
} catch (e: Exception) {
// Обработано
}
}
РЕЗУЛЬТАТ: Исключение всплывает СРАЗУ!
│
├─ CoroutineExceptionHandler (если есть)
│
└─ Parent Job cancelled ❌
ASYNC - исключение заморожено до await()
═════════════════════════════════════════
val deferred = scope.async {
try {
child1Async() // ← throws Exception
} catch (e: Exception) {
// Обработано
}
}
// Исключение ЗАМОРОЖЕНО в Deferred
val result = deferred.await() // ← Exception выбросится ТУТ!
РЕЗУЛЬТАТ: Исключение скрыто до await()
│
├─ Можно обработать в try-catch
│
└─ Parent НЕ затронут!
🎯 Способы отмены корутин
val job = launch {
repeat(1000) { i ->
println("Работаю: $i")
delay(500) // suspension point - место для проверки отмены
}
}
// СПОСОБ 1: Простая отмена
job.cancel() // Отменить (некоопера тивно)
// СПОСОБ 2: Отмена с сообщением
job.cancel(CancellationException("Превышен таймаут"))
// СПОСОБ 3: Отмена + ожидание завершения
job.cancelAndJoin() // отмена + join()
// ⏱️ TIMING DIAGRAM
┌──────────────────────────────┐
│ launch { repeat(1000) } │
├──────────────────────────────┤
│ Iteration 1 ▓▓ 500ms │
│ Iteration 2 ▓▓ 500ms │
│ job.cancel() called ─────────┼─→ CancellationException thrown
│ Iteration 3 [CANCELLED] │
│ Cleanup │
└──────────────────────────────┘
✅ Проверка статуса отмены
Способ 1: ensureActive() - строгий контроль
suspend fun processingWithEnsureActive() {
repeat(1000) { iteration ->
// Throws CancellationException если отменена!
ensureActive()
doWork(iteration)
delay(100)
}
}
// Пример с обработкой
suspend fun safeProcessing() {
try {
repeat(1000) { iteration ->
ensureActive() // ← бросает CancellationException
doWork(iteration)
}
} catch (e: CancellationException) {
cleanup()
throw e // ВСЕГДА перебросить!
}
}
Способ 2: isActive - мягкая проверка
suspend fun processingWithIsActive() {
repeat(1000) { iteration ->
if (!isActive) { // мягкая проверка
cleanup()
return // выход без исключения
}
doWork(iteration)
delay(100)
}
}
// Пример с логированием
suspend fun gracefulProcessing() {
repeat(1000) { iteration ->
if (!isActive) {
log.info("Cancelled at iteration $iteration")
return
}
doWork(iteration)
}
}
Разница между ensureActive и isActive
| Аспект | ensureActive() | isActive |
|---|---|---|
| Тип проверки | Strict | Soft |
| При отмене | throws CancellationException | возвращает false |
| Использование | Когда нужна обработка исключения | Когда нужна логика выхода |
| Performance | Немного медленнее | Быстро |
| Рекомендация | Для critial sections | Для loops/iterations |
🚨 Правильная обработка CancellationException
❌ ПЛОХО: логирование CancellationException как ошибки
// ❌ НЕПРАВИЛЬНО!
suspend fun badErrorHandling() {
try {
longRunningOperation()
} catch (e: Exception) {
log.error("Operation failed", e) // логируем отмену как ошибку!
}
}
// Результат: В логах увидим кучу CancellationException,
// хотя это нормальная часть жизненного цикла
✅ ХОРОШО: правильная обработка
// ✅ ПРАВИЛЬНО!
suspend fun goodErrorHandling() {
try {
longRunningOperation()
} catch (e: CancellationException) {
log.info("Operation cancelled")
cleanup()
throw e // ВСЕГДА перебросить CancellationException!
} catch (e: Exception) {
log.error("Operation failed", e)
throw e
}
}
// ИЛИ более краткий вариант:
suspend fun goodErrorHandlingShort() {
try {
longRunningOperation()
} finally {
cleanup() // выполнится в обоих случаях
}
}
💡 Practical patterns: Circuit Breaker
class CircuitBreakerService {
private var failureCount = 0
private var lastFailureTime = 0L
private val circuitBreakerTimeout = 30_000L
suspend fun <T> executeWithCircuitBreaker(
operationName: String,
block: suspend () -> T
): T {
if (isCircuitOpen()) {
throw CircuitBreakerOpenException(
"$operationName is temporarily unavailable"
)
}
return try {
val result = withTimeout(5000) { // 5 second timeout
block()
}
resetFailureCount()
result
} catch (e: TimeoutCancellationException) {
recordFailure()
throw ServiceTimeoutException("$operationName timed out", e)
} catch (e: Exception) {
recordFailure()
throw e
}
}
private fun isCircuitOpen(): Boolean {
return failureCount >= 5 &&
(System.currentTimeMillis() - lastFailureTime) < circuitBreakerTimeout
}
private fun recordFailure() {
failureCount++
lastFailureTime = System.currentTimeMillis()
}
private fun resetFailureCount() {
failureCount = 0
}
}
// Использование
val breaker = CircuitBreakerService()
suspend fun loadData(userId: String): UserData {
return breaker.executeWithCircuitBreaker("userService") {
apiClient.getUser(userId) // будет отменена если > 5s
}
}
📊 Сравнение Job vs SupervisorJob
EXCEPTION PROPAGATION
═════════════════════
ОБЫЧНЫЙ JOB (STRICT)
────────────────────
Parent Job
├─ Child 1: async { ... }
├─ Child 2: async { ... } ← throws Exception
└─ Child 3: async { ... }
Результат:
├─ Child 2 упал с Exception ❌
├─ Child 1 ОТМЕНЕНА ❌❌
├─ Child 3 ОТМЕНЕНА ❌❌
└─ Parent ОТМЕНЁН ❌❌
Все упали!
SUPERVISORJOB (LENIENT)
──────────────────────
Parent SupervisorJob
├─ Child 1: async { ... } ✓
├─ Child 2: async { ... } ← throws Exception ❌
└─ Child 3: async { ... } ✓
Результат:
├─ Child 2 упал с Exception ❌ (изолирован!)
├─ Child 1 продолжает работу ✓
├─ Child 3 продолжает работу ✓
└─ Parent продолжает работу ✓
Остальные работают!
| Аспект | Job | SupervisorJob |
|---|---|---|
| Ошибка дочерней | Отменяет всех siblings | Не влияет на других |
| Отмена родителя | Отменяет всех детей | Отменяет всех детей |
| Parentчка ждёт | Да, завершения всех | Да, завершения всех |
| Use case | Транзакции, critical ops | Dashboard, batch ops |
🛡️ Practical patterns: Retry with Exponential Backoff
suspend inline fun <T> withRetry(
times: Int = 3,
initialDelay: Long = 100,
maxDelay: Long = 10000,
factor: Double = 2.0,
shouldRetry: (Exception) -> Boolean = { it is IOException },
crossinline block: suspend () -> T
): T {
var currentDelay = initialDelay
var lastException: Exception? = null
repeat(times - 1) { attempt ->
try {
return block()
} catch (e: CancellationException) {
throw e // НИКОГДА не ретраим отмену!
} catch (e: Exception) {
if (!shouldRetry(e)) throw e
lastException = e
log.warn("Attempt ${attempt + 1}/$times failed, " +
"retrying in ${currentDelay}ms", e)
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
}
}
return try {
block() // последняя попытка
} catch (e: Exception) {
throw lastException ?: e
}
}
// Использование
suspend fun reliableApiCall(url: String): String {
return withRetry(
times = 3,
initialDelay = 100,
shouldRetry = { it is IOException || it is TimeoutException }
) {
httpClient.get(url)
}
}
🎯 Graceful shutdown с обработкой отмены
class DataProcessor : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.IO + job
private val _state = MutableStateFlow<State>(State.IDLE)
val state = _state.asStateFlow()
fun start() {
launch {
repeat(Int.MAX_VALUE) { iteration ->
try {
ensureActive() // проверка отмены
val data = fetchBatch()
processBatch(data)
_state.value = State.PROCESSING(iteration)
delay(1000) // work interval
} catch (e: CancellationException) {
log.info("Processing cancelled at iteration $iteration")
_state.value = State.CANCELLED
saveProgress(iteration)
throw e
}
}
}
}
suspend fun gracefulShutdown(timeout: Long = 30_000) {
log.info("Starting graceful shutdown...")
_state.value = State.SHUTTING_DOWN
try {
withTimeout(timeout) {
job.cancelAndJoin()
}
} catch (e: TimeoutCancellationException) {
log.error("Shutdown timeout, forcing termination")
job.cancel()
}
log.info("Shutdown complete")
}
}
// Использование
val processor = DataProcessor()
runtime.addShutdownHook(Thread {
runBlocking {
processor.gracefulShutdown(timeout = 30_000)
}
})
📋 Чеклист: Обработка отмены
🎓 Ключевые выводы
- CancellationException - не ошибка, перебросьте её!
- ensureActive() - для строгого контроля в critical sections
- isActive - для graceful exit из циклов
- Job - для "всё или ничего" операций
- SupervisorJob - для независимых операций
- try-finally - для гарантированного cleanup
- Retry с backoff - для нестабильных операций
- Graceful shutdown - обязателен в production
CoroutineContext
Что такое CoroutineContext?
CoroutineContext — это индексированный набор элементов, который определяет окружение выполнения корутины. Это immutable коллекция, где каждый элемент имеет уникальный ключ (Key) и может быть получен по этому ключу.
Основные принципы:
- Композиция — элементы объединяются через оператор
+ - Наследование — дочерние корутины наследуют контекст родителя
- Переопределение — новые элементы заменяют старые с тем же ключом
- Неизменяемость — создаётся новый контекст при изменениях
// Базовая структура CoroutineContext
val context: CoroutineContext =
Dispatchers.IO + // где выполняется
SupervisorJob() + // управление жизненным циклом
CoroutineName("ApiService") + // имя для отладки
CoroutineExceptionHandler { _, ex -> // обработка исключений
log.error("Unhandled exception", ex)
}
📊 Структура и композиция CoroutineContext
CoroutineContext = Indexed Set of Elements
═══════════════════════════════════════════
context: CoroutineContext
│
├─ Dispatcher (Key: ContinuationInterceptor)
│ ├─ Value: Dispatchers.IO
│ └─ Определяет: где выполняется корутина
│
├─ Job (Key: Job)
│ ├─ Value: SupervisorJob()
│ └─ Определяет: иерархия и жизненный цикл
│
├─ CoroutineName (Key: CoroutineName)
│ ├─ Value: "ApiService"
│ └─ Определяет: имя для отладки
│
└─ CoroutineExceptionHandler (Key: CoroutineExceptionHandler)
├─ Value: { _, ex -> log.error(...) }
└─ Определяет: обработка необработанных исключений
КОМПОЗИЦИЯ ЧЕРЕЗ ОПЕРАТОР +
═════════════════════════════
val dispatcher = Dispatchers.IO
val job = SupervisorJob()
val name = CoroutineName("DataFetcher")
val context1 = dispatcher + job
val context2 = context1 + name
// Результат: context2 содержит все три элемента!
// Переопределение:
val contextv2 = context2 + Dispatchers.Default
// ↑ заменяет Dispatchers.IO на Dispatchers.Default
🔄 Наследование контекста
PARENT-CHILD CONTEXT INHERITANCE
═════════════════════════════════
Parent Coroutine
context: [Dispatchers.IO, Job1, CoroutineName("Parent")]
│
├─ launch { } // наследует контекст родителя
│ context: [Dispatchers.IO, Job1, CoroutineName("Parent")]
│ ├─ ChildJob = Job() (новый Job из Parent context!)
│ └─ Dispatcher, Name наследованы
│
└─ launch(Dispatchers.Default) { } // переопределение
context: [Dispatchers.Default, Job1, CoroutineName("Parent")]
└─ Dispatcher переопределён, Job наследован
ПЕРЕОПРЕДЕЛЕНИЕ В ВЛОЖЕННЫХ КОРУТИНАХ
═══════════════════════════════════════
fun parentScope() = CoroutineScope(
Dispatchers.IO +
CoroutineName("ParentScope")
)
parentScope().launch {
// context = [Dispatchers.IO, CoroutineName("ParentScope")]
launch(CoroutineName("ChildTask")) {
// context = [Dispatchers.IO, CoroutineName("ChildTask")]
// ↑ Name переопределена!
}
withContext(Dispatchers.Default) {
// context = [Dispatchers.Default, CoroutineName("ParentScope")]
// ↑ Dispatcher переопределён!
}
}
📋 Основные элементы CoroutineContext
Dispatcher - где выполняется
// DISPATCHER LAYER IN CONTEXT
val ioContext = Dispatchers.IO
// Key: ContinuationInterceptor
// Value: CoroutineDispatcher (IO pool)
val context = Dispatchers.IO + SupervisorJob()
val dispatcher = context[ContinuationInterceptor]
// Использование:
launch(context) {
// выполнится в Dispatchers.IO потоке
}
withContext(Dispatchers.Default) {
// переключится в Default поток (CPU cores)
}
Job - управление жизненным циклом
// JOB LAYER IN CONTEXT
val parentJob = SupervisorJob()
val context = parentJob + CoroutineName("Main")
scope.launch(context) {
val currentJob = coroutineContext[Job] ✓
// currentJob заменяется на Job() для этой корутины!
println(currentJob.isActive)
}
// Отмена через context:
context.cancel() // отменяет все корутины с этим контекстом
CoroutineName - для отладки
// COROUTINE NAME FOR DEBUGGING
val context = CoroutineName("DataFetcher")
launch(context) {
val name = coroutineContext[CoroutineName]?.name
println(name) // "DataFetcher"
}
// Использование в логах:
class LoggingInterceptor {
fun log(msg: String) {
val coroutineName = coroutineContext[CoroutineName]?.name ?: "unknown"
logger.info("[$coroutineName] $msg")
}
}
CoroutineExceptionHandler - обработка исключений
// EXCEPTION HANDLER IN CONTEXT
val exceptionHandler = CoroutineExceptionHandler { _, exception ->
log.error("Unhandled coroutine exception", exception)
metricsService.recordException(exception)
}
val context = SupervisorJob() + exceptionHandler
scope.launch(context) {
throw RuntimeException("Oops!")
// → exceptionHandler будет вызван
}
// ВАЖНО: ExceptionHandler только для launch()
scope.async(context) {
throw RuntimeException("Oops!")
// → async НЕ вызывает handler, исключение идёт в await()
}
🛡️ Кастомные элементы контекста
// Пример 1: Request ID для логирования
class RequestId(val id: String) :
AbstractCoroutineContextElement(RequestId) {
companion object Key : CoroutineContext.Key<RequestId>
}
suspend fun complexOperation(requestId: String) {
withContext(RequestId(requestId)) {
service1() // может получить requestId из контекста
service2() // может получить requestId из контекста
}
}
// Использование:
fun getRequestId(): String? {
return coroutineContext[RequestId]?.id
}
// Пример 2: Custom Dispatcher для ограничения параллелизма
val limitedIO = Dispatchers.IO.limitedParallelism(4)
val context = limitedIO + CoroutineName("LimitedService")
launch(context) {
// максимум 4 одновременные операции
}
// Пример 3: Metrics контекст
class MetricsContext(val serviceName: String) :
AbstractCoroutineContextElement(MetricsContext) {
companion object Key : CoroutineContext.Key<MetricsContext>
fun recordTime(operationName: String, duration: Long) {
meterRegistry.timer("$serviceName.$operationName")
.record(duration, TimeUnit.MILLISECONDS)
}
}
val metricsContext = MetricsContext("UserService")
launch(metricsContext) {
val start = System.currentTimeMillis()
try {
fetchUser()
} finally {
val duration = System.currentTimeMillis() - start
coroutineContext[MetricsContext]?.recordTime("fetchUser", duration)
}
}
📊 Получение элементов из контекста
// Способ 1: Прямое получение по Key
val context = Dispatchers.IO + CoroutineName("Service")
val dispatcher = context[ContinuationInterceptor]
val name = context[CoroutineName]
println(dispatcher) // Dispatchers.IO
println(name?.name) // "Service"
// Способ 2: Из coroutineContext suspend функции
suspend fun myFunction() {
val currentJob = coroutineContext[Job]
val currentName = coroutineContext[CoroutineName]?.name
val currentDispatcher = coroutineContext[ContinuationInterceptor]
println("Job: $currentJob")
println("Name: $currentName")
println("Dispatcher: $currentDispatcher")
}
// Способ 3: Все элементы контекста
suspend fun printAllContextElements() {
coroutineContext.forEach { element ->
println("Element: ${element.key} = ${element.value}")
}
}
⚠️ Осторожно: ThreadLocal vs CoroutineContext
// ❌ ПЛОХО: ThreadLocal НЕ работает с корутинами!
val userId = ThreadLocal<String>()
suspend fun badApproach() {
userId.set("user-123")
withContext(Dispatchers.IO) { // ПЕРЕКЛЮЧЕНИЕ ПОТОКА!
println(userId.get()) // NULL! Данные потеряны!
}
}
// ✅ ХОРОШО: Используйте CoroutineContext элементы
class UserId(val value: String) :
AbstractCoroutineContextElement(UserId) {
companion object Key : CoroutineContext.Key<UserId>
}
suspend fun goodApproach() {
withContext(UserId("user-123")) {
// Any dispatcher!
val userId = coroutineContext[UserId]?.value
println(userId) // "user-123" ✓ ВСЕГДА работает!
}
}
// ✅ ДАЖЕ ЛУЧШЕ: Использование с nested calls
suspend fun complexCall(userId: String) {
withContext(UserId(userId)) {
service1() // может получить userId
service2() // может получить userId
}
}
suspend fun service1() {
// Получение userId из контекста
val userId = coroutineContext[UserId]?.value
log.info("Service1 called for user: $userId")
}
🎯 Практический пример: RequestContext для микросервисов
data class RequestContext(
val requestId: String,
val userId: String,
val traceId: String
) : AbstractCoroutineContextElement(RequestContext) {
companion object Key : CoroutineContext.Key<RequestContext>
}
// Middleware
class CoroutineContextMiddleware {
suspend fun <T> processRequest(
request: HttpRequest,
handler: suspend () -> T
): T {
val context = RequestContext(
requestId = UUID.randomUUID().toString(),
userId = request.header("X-User-Id") ?: "anonymous",
traceId = request.header("X-Trace-Id") ?: UUID.randomUUID().toString()
)
return withContext(context) {
handler()
}
}
}
// Services могут получить контекст
class UserService {
suspend fun getUser(userId: String): User {
val context = coroutineContext[RequestContext]
log.info("[${context?.requestId}] Getting user $userId")
return withContext(Dispatchers.IO) {
// RequestContext автоматически наследуется!
repository.findById(userId)
}
}
}
// Logging interceptor
class ContextAwareLogger {
fun info(message: String) {
val requestId = coroutineContext[RequestContext]?.requestId ?: "-"
val userId = coroutineContext[RequestContext]?.userId ?: "-"
logger.info("[$requestId][$userId] $message")
}
}
📋 Чеклист: Работа с CoroutineContext
🎓 Ключевые выводы
- CoroutineContext = индексированный набор элементов
- Композиция через оператор
+ - Наследование - дочерние наследуют родительский контекст
- Переопределение - новые элементы заменяют старые
- Dispatcher - где выполняется корутина
- Job - управление жизненным циклом
- CoroutineName - для отладки
- ExceptionHandler - обработка необработанных исключений
- Кастомные элементы - для application-specific данных
- CoroutineContext вместо ThreadLocal - в корутинах!
Kotlin Flow
📊 Flow vs RxJava vs Reactive Streams
COMPARISON MATRIX
=================
FLOW RxJava Reactor SharedFlow
┌──────────────────────────────────────────────────────────────┐
│ Холодный/Горячий COLD COLD/HOT COLD/HOT HOT │
│ Платформа Multiplatform Android JVM/Java Kotlin │
│ Thread safety suspend-safe safe safe safe │
│ Простота ★★★★★ ★★★ ★★ ★★★ │
│ Производительность ★★★★ ★★★★ ★★★★ ★★★★ │
│ Use case Kotlin apps Android Reactive Events │
└──────────────────────────────────────────────────────────────┘
ХОЛОДНЫЙ ПОТОК (FLOW)
═════════════════════
User₁ .collect() User₂ .collect()
▼ ▼
┌─────────────────┐ ┌─────────────────┐
│ Producer starts │ │ Producer starts │
│ (fresh code) │ │ (fresh code) │
│ emit 1, 2, 3 │ │ emit 1, 2, 3 │
└─────────────────┘ └─────────────────┘
Каждый подписчик запускает производителя заново!
ГОРЯЧИЙ ПОТОК (SHAREDFLOW)
══════════════════════════
Producer (runs once)
│
emit 1, 2, 3
│
├─→ User₁ sees: 2, 3 (пропустил 1)
├─→ User₂ sees: 3 (пропустили 1, 2)
└─→ User₃ sees: 1, 2, 3 (использует replay)
Один производитель для всех подписчиков!
СОСТОЯНИЕ (STATEFLOW)
════════════════════
[State: 42]
│
├─→ User₁ .collect() → получает 42 сразу
├─→ User₂ .collect() → получает 42 сразу
└─→ State changes to 99
├─→ User₁ → получает 99
└─→ User₂ → получает 99
Новые подписчики получают ПОСЛЕДНЕЕ значение!
🔄 Flow операторы - визуальная цепочка
SOURCE FLOW DATA TRANSFORMATION PIPELINE
=========================================
Source: 1, 2, 3, 4, 5
│
▼
┌─────────────┐
│ filter │ Исключить нечётные
│ { it % 2 } │
└─────────────┘
│
▼
2, 4
│
▼
┌──────────────┐
│ map │ Преобразовать в строки
│ { "N: $it" }│
└──────────────┘
│
▼
"N: 2", "N: 4"
│
▼
┌──────────────┐
│ flatMap │ Раскрыть каждое значение
│ { async{}} │
└──────────────┘
│
▼
Расширенные данные
│
▼
┌────────────────┐
│ flowOn(IO) │ Переключить контекст
└────────────────┘
│
▼
В контексте Dispatchers.IO
│
▼
┌────────────────┐
│ catch │ Обработка ошибок
│ { fallback() } │
└────────────────┘
│
▼
Конечные данные с fallback
│
▼ collect { result -> ... }
TIMING DIAGRAM - COLD FLOW EXECUTION
────────────────────────────────────
Flow creation: flow { emit(1); emit(2); emit(3) }
│
├─ NOT executing yet!
│
Subscriber 1 calls .collect():
│
▼ STARTS EXECUTION
┌────────────────┐
Time 0: │ emit 1 │
└────────────────┘
│
Time 1000ms: ┌────────────────┐
│ emit 2 │
└────────────────┘
│
Time 2000ms: ┌────────────────┐
│ emit 3 │
└────────────────┘
│
▼ STOPS
Subscriber 2 calls .collect() (later):
│
▼ STARTS EXECUTION (again!)
┌────────────────┐
Time 3000ms: │ emit 1 │
└────────────────┘
│
Time 4000ms: ┌────────────────┐
│ emit 2 │
└────────────────┘
📋 Flow операторы - справочник
Трансформация данных
// map - преобразование каждого элемента
numbersFlow().map { it * 2 } // 1,2,3,4,5 → 2,4,6,8,10
// filter - отбор элементов
numbersFlow().filter { it % 2 == 0 } // 1,2,3,4,5 → 2,4
// flatMapConcat - последовательное раскрытие
userIds.asFlow()
.flatMapConcat { userId ->
userOrders(userId) // для каждого userId получить его заказы
}
// flatMapMerge - параллельное раскрытие
userIds.asFlow()
.flatMapMerge(concurrency = 3) { userId ->
userOrders(userId) // максимум 3 параллельных запроса
}
// transform - полный контроль
numbersFlow().transform { value ->
emit(value)
emit(value * 2) // emit может быть вызван несколько раз
}
Операции без изменения потока
// onEach - выполнить side effect для каждого элемента
dataFlow()
.onEach { data ->
log.info("Processing: $data")
metricsService.record(data)
}
// onStart - выполнить при начале подписки
dataFlow().onStart { log.info("Collection started") }
// onCompletion - выполнить при завершении
dataFlow().onCompletion { log.info("Collection completed") }
// onEmpty - выполнить если поток пуст
dataFlow().onEmpty { log.warn("No data received") }
Объединение потоков
// combine - комбинировать последние значения из двух потоков
val users = usersFlow()
val statuses = statusesFlow()
combine(users, statuses) { user, status ->
UserWithStatus(user, status)
}
// zip - объединить по паре
val names = namesFlow() // Alice, Bob, Charlie
val ages = agesFlow() // 30, 25, 35
zip(names, ages) { name, age ->
Person(name, age)
}
// Результат: Person(Alice, 30), Person(Bob, 25), Person(Charlie, 35)
// merge - объединить в один поток
val events1 = eventFlow1()
val events2 = eventFlow2()
merge(events1, events2) // все события в один поток
Управление потоком
// take - взять первые N элементов
dataFlow().take(5)
// drop - пропустить первые N элементов
dataFlow().drop(3)
// filter - оставить элементы по условию
dataFlow().filter { it.isValid }
// distinct - только уникальные значения
dataFlow().distinct()
// distinctUntilChanged - пропустить последовательные дубли
dataFlow().distinctUntilChanged()
// debounce - игнорировать быстрые изменения
searchFlow.debounce(300) // ждём 300ms перед emit
// throttle - ограничить частоту emit
eventFlow.throttle(100) // максимум один раз за 100ms
// sample - брать значение каждые N ms
sensorFlow.sample(1000) // снимок каждую секунду
// timeout - отмена если нет данных за время
dataFlow.timeout(5000) // бросить TimeoutCancellationException если > 5 сек молчания
// retry - переподписаться при ошибке
dataFlow.retry(times = 3) { throwable ->
throwable is IOException // retry только на сетевых ошибках
}
Терминальные операторы
// collect - базовая подписка
dataFlow().collect { value ->
println(value)
}
// first - получить первый элемент
val firstValue = dataFlow().first()
// last - получить последний элемент
val lastValue = dataFlow().last()
// single - получить единственный элемент (или ошибка)
val onlyValue = dataFlow().single() // throws if not exactly one
// toList - собрать все элементы в список
val allValues = dataFlow().toList()
// toSet - собрать в set
val uniqueValues = dataFlow().toSet()
// fold - аккумулировать значения
val sum = numbersFlow().fold(0) { acc, value -> acc + value }
// reduce - аккумулировать с начальным значением из потока
val sum = numbersFlow().reduce { acc, value -> acc + value }
🌊 StateFlow и SharedFlow - горячие потоки
// StateFlow - текущее состояние
class UserViewModel {
private val _state = MutableStateFlow<UiState>(UiState.Loading)
val state: StateFlow<UiState> = _state.asStateFlow()
fun loadUser(id: String) {
viewModelScope.launch {
try {
_state.value = UiState.Loading
val user = userService.getUser(id)
_state.value = UiState.Success(user)
} catch (e: Exception) {
_state.value = UiState.Error(e.message)
}
}
}
}
// Подписка - новый подписчик получает ТЕКУЩЕЕ состояние
viewModel.state.collect { state ->
when (state) {
is UiState.Loading -> showProgress()
is UiState.Success -> displayUser(state.user)
is UiState.Error -> showError(state.message)
}
}
// SharedFlow - события без состояния
class EventBus {
private val _events = MutableSharedFlow<AppEvent>()
val events = _events.asSharedFlow()
suspend fun publishEvent(event: AppEvent) {
_events.emit(event)
}
}
// Подписка - получает только НОВЫЕ события
eventBus.events.collect { event ->
when (event) {
is UserLoggedIn -> handleLogin(event.userId)
is OrderCreated -> showNotification(event.orderId)
}
}
🚀 Практические примеры
Пример 1: REST API с retry и timeout
suspend fun fetchUserDataRobust(userId: String): User = withContext(Dispatchers.IO) {
userFlow(userId)
.retry(times = 3) { throwable ->
delay(1000) // exponential backoff
throwable is IOException || throwable is TimeoutException
}
.timeout(10000) // 10 second timeout
.catch { exception ->
log.error("Failed to fetch user after retries", exception)
throw UserServiceException("Could not load user", exception)
}
.first()
}
fun userFlow(userId: String): Flow<User> = flow {
val user = httpClient.get("/users/$userId")
emit(user)
}
Пример 2: Pagination через Flow
fun loadAllUsersFlow(pageSize: Int = 50): Flow<User> = flow {
var page = 0
while (currentCoroutineContext().isActive) {
val users = userRepository.findByPage(page, pageSize)
if (users.isEmpty()) break
users.forEach { emit(it) }
page++
}
}
// Использование
suspend fun processAllUsers() {
loadAllUsersFlow()
.filter { it.isActive }
.map { it.toLDTO() }
.take(100)
.collect { user ->
processUser(user)
}
}
Пример 3: Поиск с debounce
class SearchViewModel {
private val _searchQuery = MutableStateFlow("")
val searchResults: StateFlow<List<SearchResult>> =
_searchQuery
.debounce(300) // wait 300ms before searching
.distinctUntilChanged()
.flatMapLatest { query ->
if (query.isBlank()) flowOf(emptyList())
else searchService.search(query)
}
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = emptyList()
)
fun updateSearchQuery(query: String) {
_searchQuery.value = query
}
}
🛡️ Error Handling в Flow
// catch - обработка ошибок в upstream
dataFlow()
.map { risky(it) }
.catch { exception ->
when (exception) {
is NetworkException -> emit(CachedData)
is ParseException -> log.error("Parse failed", exception)
else -> throw exception
}
}
.collect { value -> println(value) }
// onCompletion - cleanup
dataFlow()
.onCompletion { cause ->
if (cause is CancellationException) {
log.info("Cancelled")
} else if (cause != null) {
log.error("Failed", cause)
} else {
log.info("Completed successfully")
}
}
.collect { value -> println(value) }
📋 Чеклист при использовании Flow
Корутины и Spring Framework
Интеграция корутин в Spring
Spring Framework с версии 5.2 предоставляет нативную поддержку корутин Kotlin, позволяя писать неблокирующие реактивные приложения в императивном стиле. Это дает возможность использовать знакомый синтаксис вместо сложных reactive chains.
Ключевые преимущества:
- Императивный код вместо callback chains
- Structured concurrency — автоматическое управление жизненным циклом
- Высокая производительность — неблокирующие I/O операции
- Простота отладки — линейный flow выполнения
📊 Spring MVC vs WebFlux + Coroutines
SPRING MVC (Blocking)
═════════════════════
@RestController
class UserController(private val userService: UserService) {
@GetMapping("/users/{id}")
fun getUser(@PathVariable id: String): User {
return userService.findById(id) // ← БЛОКИРУЕТ ПОТОК
}
}
Request workflow:
┌─ HTTP Request arrives
├─ Spring allocates Thread from pool
├─ getUser() called → blocks thread
│ └─ userService.findById() → JDBC query ← БЛОКИРУЕТ!
├─ Thread frozen while waiting
├─ Response sent
└─ Thread released
SPRING WEBFLUX + COROUTINES (Non-blocking)
═══════════════════════════════════════════
@RestController
class UserControllerCoroutines(private val userService: UserService) {
@GetMapping("/users/{id}")
suspend fun getUser(@PathVariable id: String): User {
return userService.findById(id) // suspend, не блокирует!
}
}
Request workflow:
┌─ HTTP Request arrives
├─ Spring allocates Coroutine (not thread!)
├─ getUser() called → suspends
│ └─ userService.findById() → R2DBC query ← SUSPENDS, не блокирует!
├─ Thread freed for other requests
├─ Query complete → resume coroutine
├─ Response sent
└─ Coroutine released
SCALABILITY COMPARISON
══════════════════════
Thread Pool: 200 threads
├─ Blocks on I/O → threads frozen
├─ Can serve ~200 concurrent requests
└─ Memory: ~200-300 MB
Coroutine Pool: 2 threads (Dispatchers.IO: 64+)
├─ Suspends on I/O → threads active
├─ Can serve ~10,000+ concurrent requests
└─ Memory: ~1-2 MB
🎯 Основные компоненты
1. Suspend контроллеры
@RestController
@RequestMapping("/api/users")
class UserControllerCoroutines(
private val userService: UserService,
private val orderService: OrderService
) {
// Простой случай: one-to-one операция
@GetMapping("/{id}")
suspend fun getUserWithOrders(@PathVariable id: String): UserWithOrders {
val user = userService.findById(id) // suspend
val orders = orderService.findByUserId(id) // suspend
return UserWithOrders(user, orders)
}
// ⏱️ TIMING:
// Sequential suspend: ~200ms (if each takes 100ms)
// Parallel would be better!
// Параллельные операции
@GetMapping("/{id}/dashboard")
suspend fun getUserDashboard(@PathVariable id: String): Dashboard = coroutineScope {
val userAsync = async { userService.findById(id) }
val ordersAsync = async { orderService.findByUserId(id) }
val statsAsync = async { statsService.getUserStats(id) }
Dashboard(
user = userAsync.await(),
orders = ordersAsync.await(),
stats = statsAsync.await()
)
}
// ⏱️ TIMING:
// Parallel async: ~100ms (best of all)
}
2. R2DBC - реактивный доступ к БД
// ✅ R2DBC repositories возвращают suspend функции
interface UserRepository : CoroutineCrudRepository<User, String> {
suspend fun findById(id: String): User? // suspend, не blocking!
suspend fun findByEmail(email: String): User?
suspend fun save(user: User): User
}
// Spring автоматически преобразует Mono/Flux в suspend функции!
@Service
class UserService(private val userRepository: UserRepository) {
suspend fun findById(id: String): User? {
return userRepository.findById(id)
}
suspend fun findAll(): List<User> {
return userRepository.findAll().toList() // Flow to List
}
suspend fun save(user: User): User {
return userRepository.save(user)
}
}
// Batch операции
@Service
class BatchUserService(private val userRepository: UserRepository) {
suspend fun saveAllUsers(users: List<User>) = supervisorScope {
// Параллельное сохранение батчами
users.chunked(100)
.map { batch ->
async {
batch.forEach { userRepository.save(it) }
}
}
.forEach { it.await() }
}
}
3. WebClient - неблокирующий HTTP клиент
@Service
class ExternalApiService(
private val webClient: WebClient,
private val exceptionHandler: CoroutineExceptionHandler
) {
suspend fun fetchRemoteUser(id: String): RemoteUser =
withContext(Dispatchers.IO + exceptionHandler) {
withTimeout(5000) {
webClient
.get()
.uri("/users/{id}", id)
.retrieve()
.awaitBody<RemoteUser>()
}
}
suspend fun fetchMultipleUsers(ids: List<String>): List<RemoteUser> =
supervisorScope {
ids.map { id ->
async {
try {
fetchRemoteUser(id)
} catch (e: Exception) {
log.warn("Failed to fetch user $id", e)
null
}
}
}
.mapNotNull { it.await() }
}
}
// ⏱️ TIMING:
// Sequential: 5 users × 100ms = 500ms
// Parallel async: max(100ms) = 100ms ✓
🔒 Spring Security с корутинами
// Получение current user из Security Context
@Service
class SecureUserService(private val userRepository: UserRepository) {
suspend fun getCurrentUser(): User? {
return SecurityContextHolder.getContext()?.authentication?.let { auth ->
userRepository.findByEmail(auth.name)
}
}
suspend fun getUserData(): UserData = coroutineScope {
val currentUser = getCurrentUser()
?: throw UnauthorizedException("Not authenticated")
val user = async { userRepository.findById(currentUser.id) }
val orders = async { fetchUserOrders(currentUser.id) }
UserData(
user = user.await(),
orders = orders.await()
)
}
}
// Security filter для корутин
@Component
class CoroutineSecurityFilter : OncePerRequestFilter() {
override fun doFilterInternal(
request: HttpServletRequest,
response: HttpServletResponse,
filterChain: FilterChain
) {
// Spring Security работает с корутинами через ThreadLocal
// обычным способом (за сценами работает правильно)
filterChain.doFilter(request, response)
}
}
📊 Error handling в Spring WebFlux
@RestController
@RequestMapping("/api")
class ErrorHandlingController(private val service: Service) {
@GetMapping("/safe")
suspend fun safeEndpoint(): ResponseEntity<Data> {
return try {
val data = service.fetchData()
ResponseEntity.ok(data)
} catch (e: BusinessException) {
ResponseEntity.status(HttpStatus.BAD_REQUEST)
.body(ErrorResponse(e.message))
} catch (e: Exception) {
log.error("Unexpected error", e)
ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.build()
}
}
// Global exception handler
@ExceptionHandler(TimeoutException::class)
suspend fun handleTimeout(ex: TimeoutException): ResponseEntity<ErrorResponse> {
return ResponseEntity.status(HttpStatus.REQUEST_TIMEOUT)
.body(ErrorResponse("Operation timed out"))
}
}
// ExceptionHandler filter
@Component
class CoroutineExceptionHandlerFilter : OncePerRequestFilter() {
override fun doFilterInternal(
request: HttpServletRequest,
response: HttpServletResponse,
filterChain: FilterChain
) {
try {
filterChain.doFilter(request, response)
} catch (e: CancellationException) {
// Handle gracefully, don't log as error
log.debug("Request cancelled")
} catch (e: Exception) {
log.error("Request failed", e)
response.sendError(HttpStatus.INTERNAL_SERVER_ERROR.value())
}
}
}
🌊 Server-Sent Events (SSE) с Flow
@RestController
@RequestMapping("/api/events")
class EventStreamController(private val eventService: EventService) {
@GetMapping("/stream")
suspend fun streamEvents(): ResponseEntity<Flow<ServerSentEvent<Event>>> {
return ResponseEntity.ok(
eventService.eventFlow()
.map { ServerSentEvent.builder<Event>()
.id(it.id)
.data(it)
.build()
}
.catch { e ->
log.error("Event stream error", e)
emit(ServerSentEvent.builder<Event>()
.comment("Error: ${e.message}")
.build())
}
)
}
}
@Service
class EventService(private val repository: EventRepository) {
fun eventFlow(): Flow<Event> = flow {
while (currentCoroutineContext().isActive) {
val newEvents = repository.findNewEvents()
newEvents.forEach { emit(it) }
delay(1000) // poll every second
}
}
}
// ⏱️ TIMING:
// 1000 clients × Flow push = 1 thread handling all!
🎯 Миграция с Mono/Flux на suspend
// БЫЛО: Reactor way (Mono/Flux chains)
@GetMapping("/old-way")
fun getUserOldWay(@PathVariable id: String): Mono<User> {
return userRepository.findById(id)
.doOnNext { log.info("Found user: ${it.name}") }
.onErrorMap { UserNotFoundException(it.message) }
.timeout(Duration.ofSeconds(5))
}
// СТАЛО: Suspend way (imperative)
@GetMapping("/new-way")
suspend fun getUserNewWay(@PathVariable id: String): User {
val user = userRepository.findById(id)
?: throw UserNotFoundException("User $id not found")
log.info("Found user: ${user.name}")
return user
}
// Конвертация если нужно:
@GetMapping("/hybrid")
suspend fun hybridEndpoint(): Flow<User> {
return userRepository.findAll() // R2DBC returns Flow
.onEach { log.info("Processing user: ${it.name}") }
.timeout(5.seconds) // timeout для Flow
}
📊 Практический пример: Полный микросервис
@Configuration
class CoroutineConfig {
@Bean
fun webClient(): WebClient = WebClient.create()
}
@RestController
@RequestMapping("/api/users")
class UserApiController(
private val userService: UserService,
private val exceptionHandler: CoroutineExceptionHandler
) {
@GetMapping
suspend fun listUsers(): List<UserDTO> {
return userService.getAllUsers()
.map { it.toDTO() }
}
@GetMapping("/{id}")
suspend fun getUser(@PathVariable id: String): UserDTO {
return userService.findById(id)?.toDTO()
?: throw ResponseStatusException(HttpStatus.NOT_FOUND)
}
@PostMapping
suspend fun createUser(@RequestBody request: CreateUserRequest): UserDTO {
val user = userService.create(request)
return user.toDTO()
}
@GetMapping("/{id}/dashboard")
suspend fun getUserDashboard(
@PathVariable id: String
): UserDashboardDTO = coroutineScope {
val userAsync = async { userService.findById(id) }
val ordersAsync = async { userService.getUserOrders(id) }
val statisticsAsync = async { userService.getUserStats(id) }
UserDashboardDTO(
user = userAsync.await()?.toDTO()
?: throw ResponseStatusException(HttpStatus.NOT_FOUND),
orders = ordersAsync.await(),
statistics = statisticsAsync.await()
)
}
}
@Service
class UserService(
private val userRepository: UserRepository,
private val orderRepository: OrderRepository,
private val externalApi: ExternalApiService
) {
suspend fun findById(id: String): User? {
return userRepository.findById(id)
}
suspend fun getAllUsers(): List<User> {
return userRepository.findAll().toList()
}
suspend fun create(request: CreateUserRequest): User {
val user = User(
id = UUID.randomUUID().toString(),
name = request.name,
email = request.email
)
return userRepository.save(user)
}
suspend fun getUserOrders(userId: String): List<Order> {
return orderRepository.findByUserId(userId).toList()
}
suspend fun getUserStats(userId: String): UserStats = supervisorScope {
val ordersCountAsync = async {
orderRepository.findByUserId(userId).count()
}
val totalSpentAsync = async {
orderRepository.findByUserId(userId)
.map { it.amount }
.fold(0.0) { acc, value -> acc + value }
}
UserStats(
ordersCount = ordersCountAsync.await(),
totalSpent = totalSpentAsync.await()
)
}
}
@Repository
interface UserRepository : CoroutineCrudRepository<User, String>
@Repository
interface OrderRepository : CoroutineRepository<Order, String> {
fun findByUserId(userId: String): Flow<Order>
}
📋 Чеклист: Spring + Coroutines production readiness
🎓 Ключевые выводы
- Suspend контроллеры = простой способ к non-blocking endpoints
- R2DBC = неблокирующий доступ к БД (вместо JDBC)
- WebClient = неблокирующие HTTP запросы
- async/await = параллельные операции в контроллерах
- supervisorScope = graceful degradation для независимых операций
- Exception handling = обязателен для production
- Timeout = защита от зависаний
- Flow for streaming = идеально для SSE и push уведомлений
- Security context = работает нормально с корутинами
- Performance = 10x-100x лучше чем thread pool approach
Производительность и оптимизация
📊 Утечки памяти - главные источники
GlobalScope - источник утечек памяти
GlobalScope — это глобальная область видимости корутин, которая живёт всё время жизни приложения. Использование GlobalScope может привести к утечкам памяти и неконтролируемому выполнению корутин.
MEMORY LEAK VISUALIZATION
═════════════════════════
GlobalScope Approach (❌ ОПАСНО):
─────────────────────────────────
Object created:
│
└─ GlobalScope.launch { }
│
└─ Coroutine continues forever
├─ Holds reference to context
├─ Holds reference to captured variables
└─ Never cancelled!
Object destroyed:
└─ But coroutine still running in GlobalScope
└─ Memory leak! Object cannot be GC'd
CoroutineScope Approach (✅ ПРАВИЛЬНО):
──────────────────────────────────────
Object created:
│
└─ launch { } (внутри scope)
│
└─ Coroutine running
├─ Holds reference to context
└─ Linked to scope lifecycle
Object destroyed:
└─ job.cancel()
└─ All coroutines cancelled
├─ References released
└─ Object can be GC'd ✓
// ❌ ПЛОХО - утечка памяти (никогда не отменяется!)
class UserService {
fun loadUserData(userId: String) {
GlobalScope.launch {
val userData = fetchUserFromApi(userId) // может выполняться вечно
updateCache(userData)
}
// Нет способа отменить эту корутину!
// Если UserService уничтожается, корутина продолжает работать
}
}
// ❌ ПЛОХО - долгоживущие ссылки
class MainActivity {
fun startBackgroundWork() {
GlobalScope.launch {
while (true) {
processData() // бесконечный цикл
delay(1000)
// Активность может быть уничтожена, но корутина продолжит работать
// и будет держать ссылку на контекст через замыкание
}
}
}
}
// ✅ ХОРОШО - контролируемый scope
class UserService : CoroutineScope {
private val job = SupervisorJob()
override val coroutineContext = Dispatchers.IO + job
fun loadUserData(userId: String) {
launch { // привязано к scope сервиса
val userData = fetchUserFromApi(userId)
updateCache(userData)
}
}
fun cleanup() {
job.cancel() // отменяет ВСЕ корутины сервиса
}
}
// ✅ ХОРОШО - scope привязанный к lifecycle
class BackgroundTaskManager {
private var serviceJob: Job? = null
fun start() {
serviceJob = CoroutineScope(Dispatchers.Default).launch {
while (isActive) { // проверяет отмену
processData()
delay(1000)
}
}
}
fun stop() {
serviceJob?.cancel() // явная отмена
serviceJob = null
}
}
⚠️ Проблемы с ViewModels (Android)
// ❌ ПЛОХО - утечка ViewModel
class UserViewModel : ViewModel() {
fun loadData() {
GlobalScope.launch { // переживёт ViewModel!
val data = repository.loadData()
// Попытка обновить UI после уничтожения ViewModel
_uiState.value = UiState.Success(data)
}
}
}
// ✅ ХОРОШО - viewModelScope
class UserViewModel : ViewModel() {
fun loadData() {
viewModelScope.launch { // автоматически отменяется в onCleared()
val data = repository.loadData()
_uiState.value = UiState.Success(data)
}
}
// Bonus: можно переопределить для тестирования
override fun onCleared() {
super.onCleared() // отменяет все корутины
log.info("ViewModel cleared, all coroutines cancelled")
}
}
🎯 UI Блокировка при неправильном Dispatcher
Проблема: Main Dispatcher
Main Dispatcher предназначен для UI операций и имеет один поток. Выполнение тяжёлых операций в Main Dispatcher заблокирует UI.
BLOCKING DIAGRAM
════════════════
ПРАВИЛЬНО (60 FPS):
┌─ Frame 0ms: ▓
├─ Frame 16ms: ▓ (UI responsive)
├─ Frame 32ms: ▓
└─ Frame 48ms: ▓
НЕПРАВИЛЬНО (UI FREEZES):
┌─ Frame 0ms: ▓
├─ Frame 16ms: █████████████ (BLOCKED!)
├─ Frame 32ms: █████████████ (BLOCKED!)
├─ Frame 48ms: █████████████ (BLOCKED!)
└─ Frame 64ms: ▓ (finally responsive, but 4 frames lost!)
Main Thread Timeline:
═════════════════════
❌ BAD - Heavy computation blocks main thread:
┌────────────────────────────────────────────┐
│ Main Thread │
├────────────────────────────────────────────┤
│ withContext(Dispatchers.Main) { │
│ heavyComputation() ← BLOCKS 500ms! │
│ } │
│ // UI frozen for 500ms │
└────────────────────────────────────────────┘
✅ GOOD - Heavy computation in background:
┌─────────────────────────────┐
│ Main Thread (responsive) │
├─────────────────────────────┤
│ withContext(Main) { } │ ← fast
│ │
│ withContext(Dispatchers... } ← background
│ │
│ showProgress() │ ← responsive
└─────────────────────────────┘
// ❌ ПЛОХО - блокировка UI
class DataProcessor {
suspend fun processLargeDataset(data: List<DataItem>) {
// Выполняется в Main dispatcher по умолчанию
data.forEach { item ->
val result = performHeavyComputation(item) // БЛОКИРУЕТ UI!
updateProgressBar(result.progress)
}
}
}
// ❌ ПЛОХО - синхронные операции в Main
suspend fun loadDataFromNetwork(): String {
// Если это выполняется в Main dispatcher
return URL("https://api.example.com/data")
.readText() // Блокирующий I/O вызов! UI зависнет
}
// ✅ ХОРОШО - правильное использование dispatchers
class DataProcessor {
suspend fun processLargeDataset(data: List<DataItem>) = withContext(Dispatchers.Main) {
showProcessing()
data.forEach { item ->
// Тяжёлые вычисления в background
val result = withContext(Dispatchers.Default) {
performHeavyComputation(item)
}
// Обновление UI в Main потоке
updateProgressBar(result.progress)
}
hideProcessing()
}
}
// ✅ ХОРОШО - правильная архитектура
class NetworkService {
suspend fun loadData(): String = withContext(Dispatchers.IO) {
// I/O операции в IO dispatcher
httpClient.get("https://api.example.com/data")
}
suspend fun processData(rawData: String): ProcessedData = withContext(Dispatchers.Default) {
// CPU-интенсивные операции в Default dispatcher
parseAndProcess(rawData)
}
}
class UIController {
suspend fun refreshData() {
// UI код остаётся в Main dispatcher
showLoading()
try {
val rawData = networkService.loadData()
val processedData = networkService.processData(rawData)
// Обновление UI в Main dispatcher
displayData(processedData)
} catch (e: Exception) {
showError(e.message)
} finally {
hideLoading()
}
}
}
⚠️ Блокирующие операции в корутинах
// ❌ ПЛОХО - блокирующие операции в корутинах
suspend fun badDatabaseAccess() {
// Блокирующий JDBC в корутине
val connection = DriverManager.getConnection(jdbcUrl)
val resultSet = connection.createStatement()
.executeQuery("SELECT * FROM users") // блокирует поток!
}
// ✅ ХОРОШО - неблокирующие операции (R2DBC)
suspend fun goodDatabaseAccess() {
// R2DBC или другие неблокирующие драйверы
val users = userRepository.findAll() // suspend функция, не блокирует!
return users
}
// ✅ ХОРОШО - изоляция блокирующих операций
suspend fun legacySystemIntegration() = withContext(Dispatchers.IO.limitedParallelism(4)) {
// Изолируем блокирующий код в IO dispatcher с ограничением
val legacyClient = LegacyJdbcClient()
legacyClient.blockingOperation() // ограничено 4 потоками макс
}
📊 launch vs async - когда использовать
launch - Fire and Forget (side effects)
// ✅ ХОРОШО - launch для независимых side effects
class NotificationService {
suspend fun sendNotifications(users: List<User>) {
users.forEach { user ->
// Не ждём завершения каждой отправки
launch {
emailService.sendEmail(user.email, "Welcome!")
analyticsService.trackEmailSent(user.id)
}
}
// Функция завершается сразу, уведомления отправляются в background
}
}
// ❌ ПЛОХО - неправильное использование launch
class DataService {
suspend fun getUserData(id: String): UserData {
var userData: UserData? = null
launch { // launch НЕ блокирует выполнение!
userData = fetchUserData(id)
}
return userData!! // ОШИБКА! userData всегда null
}
}
// ✅ REAL-WORLD: Order processing
class OrderProcessor {
suspend fun processOrder(order: Order): ProcessedOrder {
// Основная логика обработки заказа
val processedOrder = validateAndProcessOrder(order)
// Независимые side effects - не ждём завершения
launch { sendOrderConfirmation(order.customerEmail) }
launch { updateInventory(order.items) }
launch { logOrderProcessing(order.id) }
launch { updateAnalytics(order) }
// Возвращаем результат сразу
return processedOrder
}
}
async - Параллельные операции с результатом
// ✅ ХОРОШО - async для параллельных операций
class UserProfileService {
suspend fun loadCompleteProfile(userId: String): CompleteProfile = coroutineScope {
// Запускаем операции параллельно
val profileAsync = async { userService.getProfile(userId) }
val ordersAsync = async { orderService.getUserOrders(userId) }
val preferencesAsync = async { preferencesService.getUserPreferences(userId) }
// Ждём завершения всех операций (максимум времени самой долгой)
CompleteProfile(
profile = profileAsync.await(),
orders = ordersAsync.await(),
preferences = preferencesAsync.await()
)
}
}
// ⏱️ TIMING DIAGRAM - async vs sequential:
//
// Sequential:
// ├─ fetchUser() ▓▓▓ 500ms
// ├─ fetchOrders() ▓▓▓ 500ms
// ├─ fetchPrefs() ▓▓▓ 500ms
// Total: 1500ms ❌
//
// Parallel with async:
// ├─ fetchUser() ▓▓▓
// ├─ fetchOrders() ▓▓▓ (concurrent)
// ├─ fetchPrefs() ▓▓▓
// Total: 500ms ✓ (3x faster!)
Таблица сравнения: launch vs async
| Аспект | launch | async |
|---|---|---|
| Возвращаемое значение | Job | Deferred |
| Получение результата | Не предусмотрено | .await() |
| Назначение | Side effects | Параллельные вычисления |
| Исключения | Всплывают к родителю | Заморожены до await() |
| Блокировка вызывающей | Не блокирует | Блокирует на await() |
| Use case | Логирование, уведомления | API вызовы, вычисления |
🔄 Корутины vs RxJava/Project Reactor
Парадигмы сравнения
IMPERATIVE vs REACTIVE
══════════════════════
Coroutines (Imperative):
1. Код выглядит как обычный синхронный код
2. Читаем сверху вниз (линейно)
3. Отладка как обычно через debugger
4. Exception handling через try-catch
Пример: "Получить user, потом его orders"
RxJava (Functional Reactive):
1. Код - цепочка трансформаций
2. Читаем как pipeline (operator chains)
3. Отладка через .doOnNext(), логи
4. Exception handling через обработчики в цепи
Пример: "userObservable.flatMap(orders).map(process)"
// RxJava - функциональная цепочка
class RxUserService {
fun loadUserProfile(userId: String): Single<UserProfile> {
return userRepository.findById(userId)
.flatMap { user ->
ordersRepository.findByUserId(user.id)
.map { orders -> UserProfile(user, orders) }
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.timeout(30, TimeUnit.SECONDS)
.retry(3)
}
}
// Корутины - императивный подход (проще!)
class CoroutineUserService {
suspend fun loadUserProfile(userId: String): UserProfile = withContext(Dispatchers.IO) {
return@withContext try {
val user = userRepository.findById(userId)
val orders = ordersRepository.findByUserId(user.id)
UserProfile(user, orders)
} catch (e: Exception) {
retry(3) { // простой retry
UserProfile(
userRepository.findById(userId),
ordersRepository.findByUserId(userId)
)
}
}
}
}
Преимущества корутин - Читаемость
// RxJava - сложно отлаживать (вложенные flatMap)
fun complexRxChain(): Observable<Result> {
return dataSource1.loadData()
.flatMap { data1 ->
dataSource2.loadData(data1.id)
.flatMap { data2 ->
dataSource3.processData(data1, data2)
.map { processedData ->
Result(data1, data2, processedData)
}
}
}
.onErrorResumeNext { error ->
handleError(error)
.andThen(Observable.just(defaultResult))
}
}
// Корутины - линейный код (легко понять!)
suspend fun complexCoroutineChain(): Result {
return try {
val data1 = dataSource1.loadData()
val data2 = dataSource2.loadData(data1.id)
val processedData = dataSource3.processData(data1, data2)
Result(data1, data2, processedData)
} catch (error: Exception) {
handleError(error)
defaultResult
}
}
Таблица сравнения: Coroutines vs RxJava vs Reactor
| Аспект | Корутины | RxJava | Reactor |
|---|---|---|---|
| Парадигма | Императивная | Функциональная | Функциональная |
| Читаемость | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐⭐ |
| Отладка | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐ |
| Exception handling | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐ |
| Операторы | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
| Backpressure | Автоматическая | Ручная настройка | Ручная настройка |
| Learning curve | ⭐⭐⭐⭐⭐ (низкая) | ⭐⭐ (высокая) | ⭐⭐ (высокая) |
| Performance | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
Decision Tree: Корутины vs RxJava
Выбираешь между Coroutines и RxJava?
│
├─ Новый проект?
│ └─ YES → Используй Coroutines (проще)
│
├─ Уже используется RxJava?
│ └─ YES → Продолжай RxJava (консистентность)
│
├─ Нужна простота и readability?
│ └─ YES → Coroutines лучше
│
├─ Сложная stream обработка (debounce, combineLatest, etc)?
│ └─ YES → RxJava может быть проще
│
├─ Нужна точная backpressure стратегия?
│ └─ YES → RxJava лучше
│
└─ Команда не знакома с Rx?
└─ YES → Coroutines (низкий порог входа)
🚀 Performance оптимизации
Batch Processing
PERFORMANCE COMPARISON
══════════════════════
❌ INEFFICIENT: One coroutine per item
10,000 items × create coroutine:
├─ Allocation: 10,000 × allocation overhead
├─ Context switches: 10,000 × switches
├─ GC pressure: High (temporary objects)
└─ Time: ~500ms
✅ EFFICIENT: Batched processing
10,000 items / 100 per batch = 100 batches:
├─ Allocation: 100 × allocation overhead
├─ Context switches: 100 × switches
├─ GC pressure: Low
└─ Time: ~50ms (10x faster!)
// ❌ ПЛОХО - создание множества корутин (неэффективно)
class IneffientProcessor {
suspend fun processLargeDataset(data: List<DataItem>) = supervisorScope {
data.forEach { item ->
launch { // создаём корутину для каждого элемента!
processItem(item)
}
}
}
}
// ✅ ХОРОШО - батчевая обработка (эффективно)
class EfficientProcessor {
suspend fun processLargeDataset(data: List<DataItem>) = coroutineScope {
data.chunked(100) // группы по 100 элементов
.map { batch ->
async {
batch.forEach { item ->
processItem(item) // обрабатываем в одной корутине
}
}
}
.awaitAll()
}
}
// ✅ PARALLEL BATCHES с лимитом
class OptimizedProcessor {
suspend fun processLargeDataset(data: List<DataItem>) {
data.chunked(100)
.asFlow()
.flatMapMerge(concurrency = 4) { batch ->
flow {
batch.forEach { processItem(it) }
}
}
.collect()
}
}
Dispatcher оптимизация
// ❌ ПЛОХО - неэффективное использование (много context switches)
class BadPerformanceService {
suspend fun processData(items: List<Item>): List<Result> {
return items.map { item ->
// Context switch для каждого элемента!
withContext(Dispatchers.Default) {
heavyComputation(item)
}
}
}
}
// ✅ ХОРОШО - один context switch для всей работы
class GoodPerformanceService {
suspend fun processData(items: List<Item>): List<Result> =
withContext(Dispatchers.Default) {
// Один context switch - вся работа в Default потоке
items.map { item ->
heavyComputation(item)
}
}
}
// ✅ REAL-WORLD: Limited parallelism для legacy code
class LegacyIntegration {
private val limitedIO = Dispatchers.IO.limitedParallelism(4)
suspend fun processWithLegacy(items: List<Item>): List<Result> =
withContext(limitedIO) {
items.map { item ->
legacyBlockingCall(item) // максимум 4 одновременных вызовов
}
}
}
📋 Production Readiness Checklist
🎓 Ключевые выводы
- GlobalScope = антипаттерн в production коде
- CoroutineScope для полного контроля жизненного цикла
- Правильный Dispatcher = основа performance
- launch для side effects, async для результатов
- Батчевая обработка = оптимизация для больших наборов
- Coroutines проще чем RxJava для большинства случаев
- Context switching = главный overhead, минимизируйте его
- Блокирующие операции = враги корутин
- Structured concurrency = гарантированное управление ресурсами
- Testing важен - concurrency баги сложно отловить!
Тестирование и оптимизация
🧪 Тестирование suspend функций
Базовое тестирование с runBlocking
class UserServiceTest {
@Test
fun testUserLoading() = runBlocking {
// runBlocking блокирует текущий поток до завершения
val user = userService.loadUser("123")
assertEquals("John", user.name)
assertTrue(user.isActive)
}
@Test
fun testErrorHandling() = runBlocking {
assertThrows<UserNotFoundException> {
userService.loadUser("invalid")
}
}
}
Тестирование с TestDispatchers (рекомендуется)
class UserRepositoryTest {
private val testDispatcher = StandardTestDispatcher()
@Before
fun setUp() {
Dispatchers.setMain(testDispatcher)
}
@After
fun tearDown() {
Dispatchers.resetMain()
}
@Test
fun testDataFetching() = runTest(testDispatcher) {
// runTest управляет временем - нет реальной задержки
val repository = UserRepository()
val user = repository.getUser("123")
assertEquals("John", user.name)
}
}
Тестирование Flow
class UserFlowRepositoryTest {
@Test
fun testFlowTransformation() = runTest {
val flow = userIds.asFlow()
.map { id -> userService.getUser(id) }
.filter { user -> user.isActive }
val results = flow.toList() // собрать все элементы
assertEquals(3, results.size)
assertTrue(results.all { it.isActive })
}
@Test
fun testFlowError() = runTest {
val flow = flow<String> {
emit("value1")
throw Exception("Oops!")
}
assertThrows<Exception> {
flow.toList()
}
}
@Test
fun testFlowWithCollect() = runTest {
val values = mutableListOf<Int>()
(1..3).asFlow()
.collect { value ->
values.add(value)
}
assertEquals(listOf(1, 2, 3), values)
}
}
Тестирование с Mocks
class OrderServiceTest {
@get:Rule
val instantExecutorRule = InstantTaskExecutorRule()
private val mockRepository: OrderRepository = mock()
private lateinit var service: OrderService
@Before
fun setUp() {
service = OrderService(mockRepository)
}
@Test
fun testOrderProcessing() = runTest {
// Mock setup
val mockOrder = Order(id = "123", amount = 100.0)
coEvery { mockRepository.findById("123") } returns mockOrder
// Execute
val result = service.processOrder("123")
// Verify
assertEquals("123", result.orderId)
coVerify { mockRepository.findById("123") }
}
}
🚀 Production patterns
Паттерн 1: Graceful Degradation
class ResilientDataService {
suspend fun loadDashboardData(userId: String): DashboardData = supervisorScope {
// Критичные данные
val userAsync = async {
try { userService.getUser(userId) }
catch (e: Exception) {
log.error("User loading failed", e)
null
}
}
// Опциональные данные
val recommendationsAsync = async {
try { recommendationService.get(userId) }
catch (e: Exception) {
log.warn("Recommendations failed", e)
emptyList<Recommendation>()
}
}
val analyticsAsync = async {
try { analyticsService.track(userId) }
catch (e: Exception) {
log.warn("Analytics failed", e)
}
}
DashboardData(
user = userAsync.await(),
recommendations = recommendationsAsync.await(),
analytics = analyticsAsync.await()
)
}
}
Паттерн 2: Retry с exponential backoff
suspend inline fun <T> withRetry(
times: Int = 3,
initialDelay: Long = 100,
maxDelay: Long = 10000,
factor: Double = 2.0,
crossinline block: suspend () -> T
): T {
var currentDelay = initialDelay
var lastException: Exception? = null
repeat(times - 1) { attempt ->
try {
return block()
} catch (e: CancellationException) {
throw e // никогда не ретраим отмену
} catch (e: Exception) {
lastException = e
log.warn("Attempt ${attempt + 1} failed, retrying in ${currentDelay}ms", e)
delay(currentDelay)
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
}
}
return try {
block() // последняя попытка
} catch (e: Exception) {
throw lastException ?: e
}
}
// Использование
suspend fun reliableApiCall(): ApiResponse = withRetry(times = 3) {
apiClient.fetchData()
}
Паттерн 3: Circuit Breaker
class CircuitBreakerService {
private var failureCount = 0
private var lastFailureTime = 0L
private val circuitBreakerTimeout = 30_000L
suspend fun <T> executeWithCircuitBreaker(
operationName: String,
block: suspend () -> T
): T {
if (isCircuitOpen()) {
throw CircuitBreakerOpenException(
"$operationName is temporarily unavailable"
)
}
return try {
val result = withTimeout(5000) {
block()
}
resetFailureCount()
result
} catch (e: TimeoutCancellationException) {
recordFailure()
throw ServiceTimeoutException("$operationName timed out", e)
} catch (e: Exception) {
recordFailure()
throw e
}
}
private fun isCircuitOpen(): Boolean {
return failureCount >= 5 &&
(System.currentTimeMillis() - lastFailureTime) < circuitBreakerTimeout
}
private fun recordFailure() {
failureCount++
lastFailureTime = System.currentTimeMillis()
}
private fun resetFailureCount() {
failureCount = 0
}
}
📊 Performance optimization
Избегайте блокирующих операций
// ❌ ПЛОХО - блокирующая операция в корутине
suspend fun badDatabaseAccess() {
val connection = DriverManager.getConnection(jdbcUrl) // БЛОКИРУЕТ!
val resultSet = connection.createStatement()
.executeQuery("SELECT * FROM users")
}
// ✅ ХОРОШО - неблокирующая операция
suspend fun goodDatabaseAccess() = withContext(Dispatchers.IO) {
userRepository.findAll() // R2DBC - nonblocking
}
// ✅ ХОРОШО - изоляция блокирующего кода
suspend fun legacyIntegration() = withContext(Dispatchers.IO.limitedParallelism(4)) {
legacyJdbcClient.query() // ограничиваем количество одновременных блокирующих операций
}
Оптимальный выбор Dispatcher
I/O операции: withContext(Dispatchers.IO)
CPU операции: withContext(Dispatchers.Default)
UI операции (Android/Desktop): withContext(Dispatchers.Main)
Legacy code: withContext(Dispatchers.IO.limitedParallelism(n))
Правило:
────────
Используйте Dispatchers.Default для CPU-bound операций
Используйте Dispatchers.IO для I/O operations
НЕ используйте Dispatchers.Unconfined в production (опасно!)
Batch processing для performance
suspend fun processItemsEfficiently(items: List<Item>) = coroutineScope {
// Обработка батчами вместо одного за раз
val batchSize = 100
items.chunked(batchSize)
.map { batch ->
async {
batch.map { item ->
processItem(item)
}
}
}
.flatMap { it.await() }
}
// Параллельная обработка с лимитом
suspend fun processItemsWithLimit(items: List<Item>, concurrency: Int = 10) {
items.asFlow()
.flatMapMerge(concurrency = concurrency) { item ->
flow { emit(processItem(item)) }
}
.collect { /* processed item */ }
}
🛡️ Common pitfalls и как их избежать
Питфол 1: Забытая отмена
// ❌ ПЛОХО
class BadService {
private val scope = CoroutineScope(Dispatchers.IO)
fun load() {
scope.launch { /* work */ }
}
// Забыл отменить scope при очистке!
}
// ✅ ХОРОШО
class GoodService : CoroutineScope {
override val coroutineContext = Dispatchers.IO + SupervisorJob()
fun load() = launch { /* work */ }
fun cleanup() {
coroutineContext[Job]?.cancel()
}
}
Питфол 2: launch вместо async
// ❌ ПЛОХО - ждёшь результата неправильно
suspend fun wrongUsage(): String {
var result: String? = null
launch { result = expensiveComputation() }
return result ?: "default" // ОШИБКА - result всегда null
}
// ✅ ХОРОШО - используй async + await
suspend fun correctUsage(): String = coroutineScope {
val deferred = async { expensiveComputation() }
return@coroutineScope deferred.await()
}
Питфол 3: Неправильное обращение с CancellationException
// ❌ ПЛОХО
suspend fun badCancellation() {
try {
longRunningOperation()
} catch (e: Exception) {
log.error("Operation failed", e) // логируем отмену как ошибку!
}
}
// ✅ ХОРОШО
suspend fun goodCancellation() {
try {
longRunningOperation()
} catch (e: CancellationException) {
cleanup()
throw e // ВСЕГДА перебрасываем
} catch (e: Exception) {
log.error("Operation failed", e)
throw e
}
}
Питфол 4: GlobalScope
// ❌ ПЛОХО - утечка памяти
class Service {
fun startBackground() {
GlobalScope.launch { // объект может быть destroyed, но корутина продолжит работу
while (true) {
doWork()
delay(1000)
}
}
}
}
// ✅ ХОРОШО
class Service : CoroutineScope {
override val coroutineContext = Dispatchers.IO + SupervisorJob()
fun startBackground() = launch {
while (isActive) {
doWork()
delay(1000)
}
}
fun stop() = coroutineContext.cancel()
}