ML Infrastructure
~35 мин

Оркестрация

Airflow, Dagster, Prefect — оркестраторы, которые запускают ML-пайплайн каждый день: extract данных → compute фичей → retrain модели → deploy. DAG, scheduling, retries, алерты.

Оркестрация — чтобы пайплайн работал каждый день без тебя

Загрузка интерактивного виджета...

🗺️ Где это в общей картине

Оркестратор (Airflow, Dagster) — это диспетчер всего ML-стека. Он не обрабатывает данные сам — он командует: «в 6:00 запусти ETL-пайплайн → когда данные готовы, запусти PySpark для фичей → когда фичи посчитаны, обучи модель → когда модель готова, задеплой». Без оркестратора ты запускаешь каждый шаг руками.

Airflow — это cron на стероидах. Он НЕ обрабатывает данные — он планирует и запускает задачи. У cron нет зависимостей между задачами, ретраев, алертов, UI для мониторинга. У Airflow — есть. Ты описываешь DAG (граф зависимостей) Python-кодом, задаёшь расписание — и Airflow каждое утро выполняет цепочку шагов, перезапуская упавшие и отправляя алерты в Slack.

Конкретный пример: каждое утро в 6:00 Airflow запускает DAG: скачать данные из прод-базы → обработать PySpark-ом → посчитать фичи → обучить модель → проверить метрики → задеплоить → уведомить в Slack. Если шаг 3 упал — Airflow перезапустит только его, а не весь пайплайн.

DAG ML-пайплайна в Airflow: extract, validate, features, train, evaluate, deploy
Оркестратор управляет зависимостями задач и запускает пайплайн по расписанию

Apache Airflow — стандарт индустрии

Airflow создан в Airbnb, сейчас Apache-проект. Используется повсеместно — от стартапов до крупных компаний. Если ты будешь работать с пайплайнами — Airflow встретится почти наверняка.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "ml-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
    "email_on_failure": True,
}

with DAG(
    dag_id="ml_training_pipeline",
    default_args=default_args,
    schedule="0 6 * * *",          # каждый день в 6:00
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["ml", "training"],
) as dag:

    extract = BashOperator(
        task_id="extract_data",
        bash_command="python scripts/extract.py --date {{ ds }}",
    )

    train = PythonOperator(
        task_id="train_model",
        python_callable=train_and_log_model,
    )

    deploy = BashOperator(
        task_id="deploy_model",
        bash_command="python scripts/deploy.py",
    )

    # Зависимости — кто после кого
    extract >> train >> deploy

Ключевые понятия Airflow

  • DAG — граф задач без циклов. Определяет ЧТО выполнять и В КАКОМ ПОРЯДКЕ
  • Task — единица работы: PythonOperator, BashOperator, SparkSubmitOperator
  • Schedule — cron-выражение: "0 6 * * *" = каждый день в 6:00
  • Sensor — ожидание события: файл появился в S3, данные готовы. Пайплайн не стартует раньше времени
  • XCom — передача маленьких данных между задачами (пути, ID, метрики)
  • Backfill — запустить пайплайн за прошлые даты при изменении логики

Dagster — современная альтернатива

Dagster — «Airflow нового поколения». Главное отличие: вместо задач (tasks) ты думаешь в терминах данных (assets). Не «задача A → задача B», а «dataset A → трансформация → dataset B». Dagster сам строит DAG из зависимостей между assets. Плюсы: встроенное тестирование, type checking, лучший developer experience. Если начинаешь с нуля — стоит рассмотреть.

import dagster as dg
import pandas as pd

@dg.asset(description="Raw events from production DB")
def raw_events() -> pd.DataFrame:
    return pd.read_sql("SELECT * FROM events WHERE date = CURRENT_DATE", conn)

@dg.asset(description="User-level features for ML")
def user_features(raw_events: pd.DataFrame) -> pd.DataFrame:
    # Dagster видит зависимость: user_features нужен raw_events
    return raw_events.groupby("user_id").agg(
        total_events=("event_id", "count"),
        total_revenue=("amount", "sum"),
    ).reset_index()

# Dagster автоматически строит DAG: raw_events → user_features

Prefect — Python-first оркестрация

Prefect — самый простой API. Берёшь обычную Python-функцию, добавляешь декоратор @flow — и она становится пайплайном с ретраями и мониторингом. Хорош для небольших команд, которым не нужен тяжёлый Airflow.

Что выбрать

  • Airflow — стандарт. Огромная экосистема, 100+ операторов, все знают. Сложнее в настройке, но проверен годами
  • Dagster — для ML-команд. Asset-oriented, встроенное тестирование. Лучший DX, но экосистема меньше
  • Prefect — самый простой API. Для небольших команд и быстрого старта
  • Kubeflow Pipelines — если ML-платформа на Kubernetes и нужен inference на GPU

💡 Как это в реальной работе

В большинстве компаний уже стоит Airflow — ты подключаешься к нему и пишешь свои DAG-и. Типичный ML DAG: wait_for_data (sensor) → extract → validate → compute_features (PySpark) → train → evaluate → deploy → notify_slack. Всё видно в Web UI: какие таски прошли, какие упали, логи каждого шага.

ML-специфика оркестрации

Для ML-инженера оркестратор — не просто «запускалка ETL». У ML-пайплайнов свои паттерны:

  • Backfill для ретрейна: изменил логику фичей → нужно пересчитать фичи за последние 90 дней. Airflow backfill запускает DAG за каждую прошлую дату
  • SLA на свежесть фичей: модель рекомендаций должна получить свежие фичи до 8:00. Если ETL не успел — алерт, и модель работает на вчерашних данных (degraded mode)
  • Data quality gates: перед обучением — проверка: нет ли дрифта в данных (PSI), все ли источники доступны, нет ли аномалий в фичах. Если gate не пройден — модель не обучается, уведомление в Slack
  • Feature freshness monitoring: sensor ждёт, пока Spark-задача посчитает фичи, прежде чем запускать обучение. Без sensor — модель может обучиться на устаревших данных

🎯 На собеседовании

Junior

Зачем оркестратор, если есть cron? Cron не умеет зависимости между задачами, ретраи, алерты, UI для мониторинга. Airflow — это cron на стероидах. • Что такое DAG? Directed Acyclic Graph — граф задач без циклов. Определяет порядок выполнения: extract → transform → train → deploy. • Что такое sensor? Задача, которая ждёт внешнего события: файл появился в S3, таблица обновилась. Пайплайн не стартует, пока данные не готовы.

Middle

Airflow vs Dagster — когда что? Airflow: стандарт индустрии, огромная экосистема, task-oriented. Dagster: asset-oriented (думаешь в терминах данных, не задач), встроенное тестирование, лучший DX. Для новых ML-проектов Dagster часто лучше. • Как устроен ваш ML-пайплайн? DAG: wait_for_data (sensor) → extract → validate (data quality) → compute_features (PySpark) → train → evaluate (metrics gate) → deploy → notify. Расписание ежедневно в 6:00. Retries + таймауты. • Что такое backfill и зачем? Запуск пайплайна за прошлые даты. Изменил логику фичей → backfill за 90 дней, чтобы модель обучилась на консистентных данных.

Senior

Как обеспечить SLA для ML-пайплайна? Мониторинг длительности каждого DAG run, алерты при превышении. Feature freshness SLA: фичи должны быть готовы до N:00. Degraded mode: если ETL упал, модель работает на вчерашних фичах, а не падает. • Data quality gates в пайплайне? Перед обучением: PSI < 0.25 (нет дрифта), все источники доступны, нет аномалий в фичах. Если gate не пройден — модель не переобучается, уведомление дежурному. Great Expectations / Pandera для проверок. • Dynamic DAGs и параметризация? Airflow 2.x: dynamic task mapping для обработки переменного числа файлов/моделей. Dagster: partitioned assets для time-based обработки. Конфиги через Variables/Connections, не хардкод.