„Разговор за Europython 2023“

Тази статия служи като препис за моя разговор на EuroPython 2023 „Оркестриране на Python работни потоци в Apache Airflow“:

Какво е Apache Airflow?

Apache Airflow е оркестратор на работни потоци, което означава, че изпълнява код по определен начин, в определено време и го прави многократно.

Сред нещата, които правят Airflow страхотен оркестратор, са:

  • Този проект е с „отворен код от Airbnb“ и сега е част от фондация Apache, което означава, че можете да го „използвате безплатно“.
  • Той е изграден в Python, което го прави лесен за инсталиране, използване и дори допринасяне.
  • Потребителският интерфейс на Airflow е много достъпен и е една от силните му страни, което го прави използваем дори за нетехнически персонал.
  • С ~500 месечни ангажименти, OSS ранг #20 и много ежедневни въпроси за Stack Overflow, Airflow има силна общност от активни сътрудници и ежедневни потребители.

Инсталация и компоненти

Инсталирането на Apache Airflow е доста лесно:

pip install apache-airflow

С тази проста команда получавате следната архитектура на вашата локална машина:

Отказ от отговорност: Моля, имайте предвид, че това е урок за разработчици и много конфигурационни елементи, необходими за производствена употреба, ще липсват в тази статия.

База данни за въздушния поток

Airflow инсталира и използва sqlite по подразбиране при инсталиране. Тази DB съхранява DAG метаданни, исторически данни от цикли на изпълнение, потребителски данни и всичко останало, до което Airflow има нужда от бърз достъп.

При първата настройка обикновено искате да инициализирате DB:

airflow db init

И създайте потребител за себе си с airflow users create, не забравяйте да си дадете Admin роля, за да имате пълен достъп.

Уеб сървър на Airflow

Airflow изпълнява потребителския интерфейс на собствен уеб сървър Gunicorn, който можете да стартирате локално с командата airflow webserver.

След това, като отидете на адреса на уеб сървъра (localhost:8080 по подразбиране) и влезете с потребителя, създаден преди това, ще имате достъп до потребителския интерфейс:

Началната страница е списък с всички налични работни места в Airflow, щракването върху което и да е задание ще ви даде исторически показатели, подробности и времена на изпълнение.

Планировчик на въздушния поток

Всички налични задания за въздушен поток (известни още като DAG) се изпълняват редовно от планировчика в зависимост от техния график. За да стартирате планировчика, трябва да използвате командата airflow scheduler или да стартирате планировчика като демон.

По подразбиране планировчикът е настроен да използва SequentialExecutor, който изпълнява едно задание наведнъж. Това ограничение е свързано с използването на sqlite DB по подразбиране. И двете настройки трябва да бъдат променени за едновременно изпълнение.

Директория на Airflow Dag

При инсталацията airflow ще създаде папка airflow във вашата директория $HOME. Като алтернатива можете да използвате променливата AIRFLOW_HOME environment, за да превключите това на всяко желано място. За да добавите нови задания към вашето копие на Airflow, просто създайте папка dags в папката Airflow:

mkdir dags

И поставете вашите Python файлове с DAG дефиниция в него.

Обекти на въздушния поток

Airflow използва 3 основни обекта, за да дефинира задача:

  • DAG → Насочени ациклични графики
  • Задачи → Атомарни, идемпотентни операции
  • Оператори → Python обекти, които всъщност изпълняват задачите

Както се вижда на изображението по-горе, DAG се състои от много задачи, които се изпълняват отляво надясно с определени връзки на зависимост.

Airflow основно кодира DAG, които са съставени от задачи, които се управляват от оператори.

Куки

Работните процеси на Python могат да използват множество ресурси: бази данни, API, HTTP извиквания, външни програми…

„Куките“ за въздушен поток са интерфейс от високо ниво, който използва „Връзки за въздушен поток“, за да получи достъп до такива ресурси. Те често използват външни библиотеки или досадни мрежови операции „под капака“.

Оператори

Airflow разполага с много обширен набор от оператори, ето някои примери:

Операторите обикновено се конструират с помощта на куки за въздушен поток. Те се изпълняват от Airflow, а не от потребителя. Тяхната структура и използване гарантира повторна употреба с много малки промени в кода.

Наследяване на въздушен поток

Всички класове на въздушния поток наследяват основни черти от модел Base:

  • „Базовият оператор“
  • „Базовата кука“

По този начин, когато се създава обект на въздушен поток, трябва да се адаптира техният дизайн, за да пасне на съществуващата форма.

Създаване на персонализиран оператор: RedditMessageOperator

Ето хипотетичен случай на употреба: стартирали сте работен процес, който отнема известно време да се изпълнява и не можете да възобновите работата си, докато не приключи, намирате релаксиращо място за разглеждане на мемове в Reddit, докато не приключи, но се нуждаете от някой, който да ви предупреди, когато стане приключи, за да възобнови работа.

Въведете RedditMessageOperator!

Персонализиран оператор, който ни изпраща съобщение за възобновяване на работа. Той ще бъде извикан автоматично в края на нашия работен процес.

Създаване на кука

Започваме с внедряване на RedditHook, който ще използва praw (Python Reddit API Wrapper), за да се свърже с Reddit.

Не забравяйте да наследите от BaseHook и да извикате неговия конструктор при инициализиране.

Избрах да отменя метода get_conn, но можете да приложите нов метод, ако желаете.

Създаване на оператор

За разлика от куките, които се използват по различни начини в операторите, операторите се извикват от вътрешната логика на Airflow, която използва метода execute като входна точка.

Така че създаваме нашия RedditMessageOperator чрез наследяване от BaseOperator и заместваме задължителния метод execute:

И сега имаме оператор, който при изпълнение ще използва връзка за изпращане на съобщение до целеви потребител.

Сега нека направим работен процес на Python, който използва нашия персонализиран оператор!

Правене на Dag

Airflow е просто код на Python. По същия начин DAG са само код на python:

Ето преглед на това, което правим:

  • Редове 1–5:Импортиране, datetime , timedelta и DAG винаги трябва да са там, както и операторите, които използваме. Тук ще използваме EmptyOperator за симулиране на времеемка операция и нашия RedditMessageOperator за тестване.
  • Редове 7–25: Конфигурация, задаване на стойности по подразбиране, имейли за наблюдение или честоти на графика.
  • Редове 27–34: DAG декларация, създавайки нашия DAG обект с гореспоменатите стойности по подразбиране.
  • Редове 35–44: Декларация на задача, създаване на нашите различни обекти на задача чрез извиквания на оператор и задаване на реда (ред 44) между тях.

Виждайки го в действие

Ето резултатите:

За повече информация можете да гледате моя разговор за EP2023, да намерите кода в Github или просто да се свържете с мен.