что такое airflow apache

Как мы оркестрируем процессы обработки данных с помощью Apache Airflow

Всем привет! Меня зовут Никита Василюк, я инженер по работе с данными в департаменте данных и аналитики компании Lamoda. В нашем департаменте Airflow играет роль оркестратора процессов обработки больших данных, с его помощью мы загружаем в Hadoop данные из внешних систем, обучаем ML модели, а также запускаем проверки качества данных, расчеты рекомендательных систем, различных метрик, А/Б-тестов и многое другое.

В этой статье я расскажу:

Что такое Airflow

Airflow – это платформа для создания, мониторинга и оркестрации пайплайнов. Этот open source проект, написанный на Python, был создан в 2014 году в компании Airbnb. В 2016 году Airflow ушел под крылышко Apache Software Foundation, прошел через инкубатор и в начале 2019 года перешел в статус top-level проекта Apache.

В мире обработки данных некоторые называют его ETL-инструментом, но это не совсем ETL в классическом его понимании, как, например, Pentaho, Informatica PowerCenter, Talend и иже с ними. Airflow – это оркестратор, “cron на батарейках”: он сам не выполняет тяжелую работу по перекладке и обработке данных, а говорит другим системам и фреймворкам, что надо делать, и следит за статусом выполнения. Мы в основном используем его для запуска запросов в Hive или Spark джобы.

Спектр решаемых с помощью Airflow задач не ограничивается запуском чего-то в Hadoop кластере. Он может запускать Python-код, выполнять Bash команды, поднимать Docker контейнеры и поды в Kubernetes, выполнять запросы в вашей любимой базе данных и многое другое.

Архитектура Airflow

Примерно так выглядит наш текущий сетап Airflow, только в Lamoda используются два воркера. На отдельной машине крутятся веб-сервер и scheduler, на соседних пыхтят воркеры. Один создан для регулярных задач, второй мы адаптировали для запуска обучения ML моделей с помощью Vowpal Wabbit. Все компоненты общаются между собой через очередь задач и базу метаданных.

На заре развития Airflow в компании все компоненты (кроме БД) работали на одной машине, однако в какой-то момент это привело к нехватке ресурсов на сервере и задержкам в работе шедулера. Поэтому мы решили разнести сервисы по разным серверам и пришли к архитектуре, показанной на картинке выше.

Компоненты Airflow

Webserver – это веб-интерфейс, показывающий, что сейчас происходит с пайплайном. Эту страницу видит пользователь:

Веб-сервер дает возможность просматривать список имеющихся пайплайнов. Возле каждого пайплайна отображается краткая статистика запусков. Также имеется несколько кнопок, которые принудительно запускают пайплайн или показывают детальную информацию: статистику запусков, исходный код пайплайна, его визуализацию в виде графа или таблицы, список задач и историю их запусков.

Если нажать на пайплайн, мы провалимся в меню Graph View. Тут отображаются задачи и связи между ними.

Рядом с Graph View есть меню Tree View. Оно создано для перезапуска задач, просмотра статистики и логов. В левой части отображается древовидное представление графа, напротив него – таблица с историей запуска задач.

Каждая строчка этой страшной таблицы – одна задача, каждый столбец – один запуск пайплайна. На их пересечении – квадратик с запуском определенной задачи за определенную дату. Если на него нажать, появляется меню, где можно посмотреть детальную информацию и логи этой задачи, запустить или перезапустить ее, а также пометить её как успешную или неудачную.

Scheduler – как понятно из названия, запускает пайплайны, когда настает их время. Он представляет собой Python-процесс, который периодически ходит в директорию с пайплайнами, подтягивает оттуда их актуальное состояние, проверяет статус и запускает. Вообще, Scheduler – это самое интересное и одновременно самое узкое место в архитектуре Airflow.

Worker – это место, где запускается наш код и выполняются задачи. Airflow поддерживает несколько экзекьюторов:

Сущности Apache Airflow

Самая важная сущность Airflow – это DAG, он же пайплайн, он же направленный ациклический граф. Чтобы стало понятнее, как его готовить и зачем он нужен, я разберу небольшой пример.

Допустим, к нам пришел аналитик и попросил раз в день наливать данные в определенную таблицу. Он подготовил всю информацию: что откуда нужно брать, когда нужно запускаться, с каким SLA. Вот пример того, как мы могли бы описывать наш пайплайн.

В dag_id передается уникальное название пайплайна. Дальше мы с помощью schedule_interval указываем, как часто он должен запускаться.

Очень важный момент: поскольку Airflow разрабатывался международной компанией, он работает только по UTC. На текущий момент нет вменяемого способа заставить Airflow работать в другом часовом поясе, поэтому нужно постоянно помнить про разницу нашего часового пояса с UTC. В версии 1.10.10 появилась возможность менять таймзону в UI, однако это касается только веб-интерфейса, пайплайны все равно будут запускаться по UTC.

Параметр default_args – это словарь, в котором описываются аргументы по умолчанию для всех задач в рамках этого пайплайна. Название большинства параметров хорошо себя описывает, я не буду на них останавливаться.

Оператор

Оператор – это Python класс, который описывает, какие действия надо совершить в рамках нашей ежедневной задачи, чтобы порадовать аналитика.

Мы можем использовать HiveOperator, который, как ни странно, создан для того, чтобы отправлять запросы на выполнение в Hive. Для запуска оператора нужно указать название задачи, пайплайн, идентификатор соединения к Hive и выполняемый запрос.

В запросе, который мы передаем в конструктор оператора, есть кусочек Jinja-шаблона. Jinja – это библиотека Python для шаблонизации.

Каждый запуск пайплайна хранит информацию о дате запуска. Она лежит в переменной под названием execution_date. << ds >> – это макрос, который возьмет в execution_date только дату в формате %Y-%m-%d. В определенный момент перед запуском оператора Airflow отрендерит строку запроса, подставит туда нужную дату и отправит запрос на выполнение.

ds – это не единственный макрос, их порядка 20 (список всех доступных макросов). Они включают в себя разные форматы дат и парочку функций для работы с датами – прибавить или отнять сколько-то дней.

Когда я познакомился с Airflow, то не понимал, зачем нужны всякие макросы, когда можно просто вставить туда вызов datetime.now() и радоваться жизни. Но в некоторых кейсах это может сильно портить жизнь как нам, так и аналитику. Например, если мы захотим пересчитать что-то за какую-то дату в прошлом, Airflow подставит туда не дату запуска пайплайна, а фактическое время выполнения. И в некоторых случаях мы можем получить не то, что ожидаем.

Например, если захотим перезапустить пайплайн за прошлый вторник, то при использовании datetime.now() мы на самом деле пересчитаем пайплайн за сегодняшний день, а не за нужную дату. Плюс ко всему, сегодняшние данные к этому моменту могут быть даже не готовы.

После успешного выполнения запроса мы можем отправить уведомление в slack о загрузке данных. Дальше командуем Airflow, в каком порядке запускать задачи. Благодаря перегрузке операторов в Airflow, я легко с помощью оператора >> указываю порядок шагов в пайплайне. В моём примере мы говорим, что сначала запустим выполнение запроса, потом отправку нотификации в slack.

Читайте также:  что нужно делать 6 января перед рождеством

Идемпотентность

Невозможно рассказать про Airflow, не упомянув про идемпотентность. На всякий случай напомню: идемпотентность – это свойство объекта при повторном применении операции к объекту всегда возвращать один и тот же результат.

В контексте Airflow это значит, что если сегодня пятница, а мы перезапускаем задачу за прошлый вторник, то задача запустится так, как будто для нее сейчас прошлый вторник, и никак иначе. То есть запуск или перезапуск задачи за какую-то дату в прошлом никак не должен зависеть от того, когда эта задача фактически запускается. Идемпотентность реализуется с помощью вышеупомянутой переменной execution_date.

Airflow разрабатывался как инструмент для решения задач, связанных с обработкой данных. В этом мире мы обычно обрабатываем крупную порцию данных только тогда, когда она готова, то есть на следующий день. И создатели Airflow изначально изложили такую концепцию в своих продуктах.

Когда мы запускаем ежедневный пайплайн, то с большой вероятностью захотим обрабатывать данные за вчера. Именно поэтому execution_date будет равен левой границе интервала, за которой мы обрабатываем данные. Например, сегодняшний запуск, который стартовал в час ночи по UTC, получит в качестве execution_date вчерашнюю дату. В случае ежечасного пайплайна ситуация такая же: для запуска пайплайна в 6 утра время в execution_date будет равно 5 часам утра. Это мысль поначалу не очень очевидна, но тем не менее, она очень осмысленная и важная.

Самые распространенные операторы Airflow

В Airflow есть не только операторы, которые ходят в Hive и отправляют что-то в slack. На самом деле, есть огромное множество операторов. В статью я вынес самые популярные и полезные.

Какие операторы мы используем в Lamoda

Другие полезности Airflow

Далее видно, как в течение дня менялась длительность выполнения задач. В нашем случае это процесс перекладки данных из Kafka в Hive с проверкой качества данных. Плюс можно проследить, когда задача почему-то выполнялась дольше обычного.

Как преуспеть в разработке на Airflow

Ниже я привел несколько советов, которые помогут не выстрелить себе в ногу при использовании Airflow:

Генерация дагов: генератор

С начала использования Airflow мы держали конфиги пайплайнов отдельно от кода. Изначально это было связано с особенностями схемы деплоя, но постепенно этот подход прижился. И сейчас мы используем конфиги везде, где есть намек на шаблонность. Особенно у нас это касается Spark джобов, которые мы запускаем из Docker. Из этого получилась история с декларативным написанием пайплайнов.

Подход заключается в том, что у нас есть директория с конфигами. В каждом конфиг-файле лежит один или несколько пайплайнов с их описанием: как они должны работать, когда запускаться, какие в нем задачи и в каком порядке их нужно выполнить.

Я покажу, как выглядит код вызова нашего генератора пайплайна. На вход он получает директорию с конфигами, prefix и класс, который будет отвечать за наполнение пайплайна задачами. Под капотом генератор сходит в указанную директорию, найдет там конфиг-файлы, и для каждого пайплайна в этих файлах создаст задачи и свяжет их.

Примерно так выглядит типичный конфиг-файл. Для описания конфигов мы используем формат HOCON, который является надмножеством JSON. Он поддержкивает импорты других HOCON файлов и может ссылаться на значения других переменных.

В конфиге на уровне пайплайна (блок attribution) можно указать много параметров, но самым важным являются name, start_date и schedule_interval.

Тут можно указать concurrency – сколько задач будет одновременно бежать в одном запуске. С недавних пор мы добавляем сюда блок с кратким markdown-описанием пайплайна. Потом оно вместе с остальной информацией о пайплайне отправится в Confluence (отправку мы реализовали с помощью Foliant). Получилось супер-удобно: так мы экономим время разработчиков дагов на создание страниц в Confluence.

Далее идет часть, которая отвечает за формирование задач. Сначала мы в блоке connections указываем, из какого connection в Airflow нужно брать параметры для подключения к внешнему источнику – в примере это наш DWH.

Вся необходимая информация типа пользователя, пароля, URL и так далее пробросится в docker-контейнер в качестве переменных окружений. В блоке Containers указываем, какие задачи мы будем запускать. Внутри есть название образа, список используемых connection и список переменных окружений.

Можно заметить, что в значениях некоторых переменных окружения фигурируют Jinja-шаблоны. Для указания очереди в YARN мы используем стандартный синтаксис Airflow для получения значений переменных. Для указания даты запуска используем макрос << ds_nodash >>, который представляет собой дату их execution_date без дефисов. В конфиге перечислены еще 3 похожие задачи, они скрыты для наглядности.

Дальше с помощью tasks мы указываем, как эти задачи будут запускаться. Можно заметить, что они перечислены как список в списке. Это значит, что все 4 эти задачи будут запускаться параллельно друг с другом.

И последнее: мы указываем, от каких базовых пайплайнов зависит наш текущий DAG. Странные циферки и буковки в конце названий базовых дагов – это расписание, которое мы встраиваем в название пайплайна. Таким образом, наш пайплайн начнет заполняться только после того, как завершатся базовые даги и указанные в них задачи.

Вот что мы получаем после генерации:

Что мы хотим делать дальше

Во-первых, построить полноценный Feature environment. Сейчас у нас есть один девелоперский стенд для тестирования всех наших пайплайнов. И перед тестированием нужно убедиться, что dev-ландшафт сейчас свободен.

Недавно наша команда расширилась, и желающих прибавилось. Мы нашли временное решение проблемы и теперь сообщаем в Slaсk, когда занимаем dev. Это работает, но все-таки это узкое место в процессе разработки и тестирования.

Один из вариантов – переезд в Kubernetes. Например, при создании pull-request в master можно поднимать в Kubernetes отдельный namespace, куда разворачивать Airflow, деплоить код, потом раскидывать переменные, коннекшены. Разработчик после развертывания придет в свежесозданный инстанс Airflow и будет тестировать свои пайплайны. У нас есть наработки на эту тему, но руки не добрались до боевого Kubernetes-кластера, где мы могли бы это все запускать.

Второй вариант реализации Feature environment – организация репозитория с общей веткой develop, куда вливается код разработчиков и автоматически выкатывается на dev-ландшафт. Сейчас активно смотрим в сторону этой схемы.

Также мы хотим попробовать внедрить у себя плагины – штуки для расширения функциональности веб-интерфейса. Основная цель внедрения плагинов – построить диаграмму Ганта на уровне всего Airflow, то есть на уровне всех пайплайнов, а также построить граф зависимостей между разными пайплайнами.

Почему мы выбрали Airflow

Минусы Airflow, которые мы обнаружили

В каких случаях можно подружиться с Airflow

Источник

Apache Airflow: делаем ETL проще

Привет, я Дмитрий Логвиненко — Data Engineer отдела аналитики группы компаний «Везёт».

Я расскажу вам о замечательном инструменте для разработки ETL-процессов — Apache Airflow. Но Airflow настолько универсален и многогранен, что вам стоит присмотреться к нему даже если вы не занимаетесь потоками данных, а имеете потребность периодически запускать какие-либо процессы и следить за их выполнением.

Читайте также:  что значит мем directed by robert b weide

И да, я буду не только рассказывать, но и показывать: в программе много кода, скриншотов и рекомендаций.


Что обычно видишь, когда гуглишь слово Airflow / Wikimedia Commons

Оглавление

Введение

Apache Airflow — он прямо как Django:

— только лучше, да и сделан совсем для других целей, а именно (как написано до ката):

Мы используем Apache Airflow так:

До недавнего времени наши потребности покрывал один небольшой сервер на 32 ядрах и 50 GB оперативки. В Airflow при этом работает:

А о том, как мы расширялись, я напишу ниже, а сейчас давайте определим über-задачу, которую мы будем решать:

Есть три исходных SQL Server’а, на каждом по 50 баз данных — инстансов одного проекта, соответственно, структура у них одинаковая (почти везде, муа-ха-ха), а значит в каждой есть таблица Orders (благо таблицу с таким названием можно затолкать в любой бизнес). Мы забираем данные, добавляя служебные поля (сервер-источник, база-источник, идентификатор ETL-задачи) и наивным образом бросим их в, скажем, Vertica.

Часть основная, практическая (и немного теоретическая)

Зачем оно нам (и вам)

Informatica Power Center — крайне развесистая система, чрезвычайно производительная, со своими железками, собственным версионированием. Использовал я дай бог 1% её возможностей. Почему? Ну, во-первых, этот интерфейс где-то из нулевых психически давил на нас. Во-вторых, эта штуковина заточена под чрезвычайно навороченные процессы, яростное переиспользование компонентов и другие очень-важные-энтерпрайз-фишечки. Про то что стоит она, как крыло Airbus A380/год, мы промолчим.

SQL Server Integration Server — этим товарищем мы пользовались в своих внутрипроектных потоках. Ну а в самом деле: SQL Server мы уже используем, и не юзать его ETL-тулзы было бы как-то неразумно. Всё в нём в хорошо: и интерфейс красивый, и отчётики выполнения… Но не за это мы любим программные продукты, ох не за это. Версионировать его dtsx (который представляет собой XML с перемешивающимися при сохранении нодами) мы можем, а толку? А сделать пакет тасков, который перетащит сотню таблиц с одного сервера на другой? Да что сотню, у вас от двадцати штук отвалится указательный палец, щёлкающий по мышиной кнопке. Но выглядит он, определенно, более модно:

Мы безусловно искали выходы. Дело даже почти дошло до самописного генератора SSIS-пакетов.

… а потом меня нашла новая работа. А на ней меня настиг Apache Airflow.

Когда я узнал, что описания ETL-процессов — это простой Python-код, я только что не плясал от радости. Вот так потоки данных подверглись версионированию и диффу, а ссыпать таблицы с единой структурой из сотни баз данных в один таргет стало делом Python-кода в полтора-два 13” экрана.

Собираем кластер

Давайте не устраивать совсем уж детский сад, и не говорить тут о совершенно очевидных вещах, вроде установки Airflow, выбранной вами БД, Celery и других дел, описанных в доках.

Чтобы мы могли сразу приступить к экспериментам, я набросал docker-compose.yml в котором:

Кое-где код в примерах приведен не полностью (чтобы не загромождать текст), а где-то он модифицируется в процессе. Цельные работающие примеры кода можно посмотреть в репозитории https://github.com/dm-logv/airflow-tutorial.

Ну а теперь просто:

После того, как всё поднимется, можно смотреть на веб-интерфейсы:

Основные понятия

Если вы ничего не поняли во всех этих «дагах», то вот краткий словарик:

Scheduler — самый главный дядька в Airflow, контролирующий, чтобы вкалывали роботы, а не человек: следит за расписанием, обновляет даги, запускает таски.

Вообще, в старых версиях, у него были проблемы с памятью (нет, не амнезия, а утечки) и в конфигах даже остался легаси-параметр run_duration — интервал его перезапуска. Но сейчас всё хорошо.

DAG (он же «даг») — «направленный ацикличный граф», но такое определение мало кому что скажет, а по сути это контейнер для взаимодействующих друг с другом тасков (см. ниже) или аналог Package в SSIS и Workflow в Informatica.

Помимо дагов еще могут быть сабдаги, но мы до них скорее всего не доберёмся.

Operator — это кусочки кода, ответственные за выполнение какого-либо конкретного действия. Есть три типа операторов:

Task — объявленные операторы вне зависимости от типа и прикрепленные к дагу повышаются до чина таска.

Task instance — когда генерал-планировщик решил, что таски пора отправлять в бой на исполнители-воркеры (прямо на месте, если мы используем LocalExecutor или на удалённую ноду в случае с CeleryExecutor ), он назначает им контекст (т. е. комплект переменных — параметров выполнения), разворачивает шаблоны команд или запросов и складывает их в пул.

Генерируем таски

Сперва обозначим общую схему нашего дага, а затем будем всё больше и больше погружаться в детали, потому что мы применяем некоторые нетривиальные решения.

Итак, в простейшем виде подобный даг будет выглядеть так:

Пока на этом всё. Что мы получили:


Зависимости кто будет ставить?

Чтобы всё это дело упростить я вкорячил в docker-compose.yml обработку requirements.txt на всех нодах.

Вот теперь понеслась:

Серые квадратики — task instances, обработанные планировщиком.

Немного ждем, задачи расхватывают воркеры:

Зеленые, понятное дело, — успешно отработавшие. Красные — не очень успешно.

Немного о Flower

Пока воркеры молотят наши тасочки-пустышки, вспомним про еще один инструмент, который может нам кое-что показать — Flower.

Самая первая страничка с суммарной информацией по нодам-воркерам:

Самая насыщенная страничка с задачами, отправившимися в работу:

Самая скучная страничка с состоянием нашего брокера:

Самая яркая страничка — с графиками состояния тасков и их временем выполнения:

Догружаем недогруженное

Итак, все таски отработали, можно уносить раненых.

А раненых оказалось немало — по тем или иным причинами. В случае правильного использования Airflow вот эти самые квадраты говорят о том, что данные определенно не доехали.

Нужно смотреть лог и перезапускать упавшие task instances.

Жмякнув на любой квадрат, увидим доступные нам действия:

Можно взять, и сделать Clear упавшему. То есть, мы забываем о том, что там что-то завалилось, и тот же самый инстанс таска уйдет планировщику.

Выберем всё разом и обнулим нажмем правильный пункт:

После очистки наши такси выглядят так (они уже ждут не дождутся, когда шедулер их запланирует):

Соединения, хуки и прочие переменные

Самое время посмотреть на следующий DAG, update_reports.py :

Все ведь когда-нибудь делали обновлялку отчетов? Это снова она: есть список источников, откуда забрать данные; есть список, куда положить; не забываем посигналить, когда всё случилось или сломалось (ну это не про нас, нет).

Давайте снова пройдемся по файлу и посмотрим на новые непонятные штуки:

report_update >> [email, tg] — все VerticaOperator сойдутся в отправке письма и сообщения, вот так:

Но так как у операторов-нотификаторов стоят разные условия запуска, работать будет только один. В Tree View всё выглядит несколько менее наглядно:

Скажу пару слов о макросах и их друзьях — переменных.

Макросы — это Jinja-плейсхолдеры, которые могут подставлять разную полезную информацию в аргументы операторов. Например, так:

Читайте также:  что делать если не сохнет эпоксидная смола

Присвоенные значения можно смотреть с помощью кнопки Rendered на каждом таск-инстансе. Вот так у таска с отправкой письма:

А так у таски с отправкой сообщения:

Полный список встроенных макросов для последней доступной версии доступен здесь: Macros Reference

Более того, с помощью плагинов, мы можем объявлять собственные макросы, но это уже совсем другая история.

Помимо предопределенных штук, мы можем подставлять значения своих переменных (выше в коде я уже этим воспользовался). Создадим в Admin/Variables пару штук:

Всё, можно пользоваться:

В значении может быть скаляр, а может лежать и JSON. В случае JSON-а:

Скажу буквально одно слово и покажу один скриншот про соединения. Тут всё элементарно: на странице Admin/Connections создаем соединение, складываем туда наши логины/пароли и более специфичные параметры. Вот так:

Пароли можно шифровать (более тщательно, чем в варианте по умолчанию), а можно не указывать тип соединения (как я сделал для tg_main ) — дело в том, что список типов зашит в моделях Airflow и расширению без влезания в исходники не поддается (если вдруг я чего-то не догуглил — прошу меня поправить), но получить креды просто по имени нам ничто не помешает.

Variables и Connections, безусловно, классные средства, но важно не потерять баланс: какие части ваших потоков вы храните собственно в коде, а какие — отдаете на хранение Airflow. C одной стороны быстро поменять значение, например, ящик рассылки, может быть удобно через UI. А с другой — это всё-таки возврат к мышеклику, от которого мы (я) хотели избавиться.

Разбираем кастомный оператор

И мы вплотную подобрались к тому, чтобы посмотреть на то, как сделан TelegramBotSendMessage

Код commons/operators.py с собственно оператором:

Здесь, как и остальное в Airflow, всё очень просто:

Я даже не знаю, что тут можно объяснять, просто отмечу важные моменты:

Пока мы всё это изучали, наши обновления отчетов успели успешно завалиться и отправить мне в канал сообщение об ошибке. Пойду проверять, что опять не так.


В нашем даге что-то сломалось! А ни этого ли мы ждали? Именно!

Наливать-то будешь?

Чувствуете, что-то я пропустил? Вроде бы обещал данные из SQL Server в Vertica переливать, и тут взял и съехал с темы, негодяй!

Злодеяние это было намеренным, я просто обязан был расшифровать вам кое-какую терминологию. Теперь можно ехать дальше.

План у нас был такой:

Итак, чтобы всё это запустить, я сделал маленькое дополнение к нашему docker-compose.yml :

Запускаем всё добро с помощью чуть более сложной, чем в прошлый раз, команды:

Что нагенерировал наш чудорандомайзер, можно, воспользовавшись пунктом Data Profiling/Ad Hoc Query :


Главное, не показывать это аналитикам

Подробно останавливаться на ETL-сессиях я не буду, там всё тривиально: делаем базу, в ней табличку, оборачиваем всё менеджером контекста, и теперь делаем так:

Настала пора забрать наши данные из наших полутора сотен таблиц. Сделаем это с помощью очень незатейливых строчек:

Посмотрим, чем Airflow нашпиговал аргументы наших функций:

Если данных не оказалось, то продолжать смысла нет. Но считать заливку успешной тоже странно. Но это и не ошибка. А-а-а, что делать?! А вот что:

AirflowSkipException скажет Airflow, что ошибки, собственно нет, а таск мы пропускаем. В интерфейсе будет не зеленый и не красный квадратик, а цвета pink.

Подбросим нашим данным несколько колонок:

Остался предпоследний шаг: залить всё в Vertica. А, как ни странно, один из самых эффектных эффективных способов сделать это — через CSV!

Из драйвера заберем, сколько строчек засыпалось, и скажем менеджеру сессии, что всё ОК:

На проде мы создаем целевую табличку вручную. Здесь же я позволил себе небольшой автомат:

Я с помощью VerticaOperator() создаю схему БД и таблицу (если их еще нет, естественно). Главное, правильно расставить зависимости:

Подводим итоги

— Ну вот, — сказал мышонок, — не правда ли, теперь
Ты убедился, что в лесу я самый страшный зверь?

Джулия Дональдсон, «Груффало»

Думаю, если бы мы с моими коллегами устроили соревнование: кто быстрее составит и запустит с нуля ETL-процесс: они со своими SSIS и мышкой и я с Airflow… А потом бы мы еще сравнили удобство сопровождения… Ух, думаю, вы согласитесь, что я обойду их по всем фронтам!

Если же чуть-чуть посерьезнее, то Apache Airflow — за счет описания процессов в виде программного кода — сделал мою работу гораздо удобнее и приятнее.

Его же неограниченная расширяемость: как в плане плагинов, так и предрасположенность к масштабируемости — даёт вам возможность применять Airflow практически в любой области: хоть в полном цикле сбора, подготовки и обработки данных, хоть в запуске ракет (на Марс, конечно же).

Часть заключительная, справочно-информационная

Грабли, которые мы собрали за вас

И больше никаких проблем.

Всё на одной машине. Да, и базы (самого Airflow и нашей обмазки), и веб-сервер, и планировщик, и воркеры. И оно даже работало. Но со временем количество задач у сервисов росло, и когда PostgreSQL стал отдавать ответ по индексу за 20 с вместо 5 мс, мы его взяли и унесли.

LocalExecutor. Да, мы сидим на нём до сих пор, и мы уже подошли к краю пропасти. LocalExecutor’а нам до сих пор хватало, но сейчас пришла пора расшириться минимум одним воркером, и придется поднапрячься, чтобы переехать на CeleryExecutor. А ввиду того, что с ним можно работать и на одной машиной, то ничего не останавливает от использования Celery даже не сервере, который «естественно, никогда не пойдет в прод, чесслово!»

Неиспользование встроенных средств:

Злоупотребление почтой. Ну что тут сказать? Были настроены оповещения на все повторы упавших тасков. Теперь в моём рабочем Gmail >90k писем от Airflow, и веб-морда почты отказывается брать и удалять больше чем по 100 штук за раз.

Средства ещё большей автоматизации

Для того чтобы нам еще больше работать головой, а не руками, Airflow заготовила для нас вот что:

REST API — он до сих пор имеет статус Experimental, что не мешает ему работать. С его помощью можно не только получать информацию о дагах и тасках, но остановить/запустить даг, создать DAG Run или пул.

CLI — через командную строку доступны многие средства, которые не просто неудобны в обращении через WebUI, а вообще отсутствуют. Например:

Подключение к базе метаданных Airflow. Писать в неё я не рекомендую, а вот доставать состояния тасков для различных специфических метрик можно значительно быстрее и проще, чем через любой из API.

Скажем, далеко не все наши таски идемпотентны, а могут иногда падать и это нормально. Но несколько завалов — это уже подозрительно, и надо бы проверить.

Ссылки

Ну и естественно первые десять ссылок из выдачи гугла содержимое папки Airflow из моих закладок.

И ссылки, задействованные в статье:

Источник

Строительный портал