Оператор Python в Apache Airflow практическое руководство для разработчиков

Для создания задач в Apache Airflow используйте оператор PythonOperator. Он позволяет выполнять произвольный Python-код внутри DAG. Например, чтобы напечатать сообщение в логах, достаточно определить функцию и передать её в оператор:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def print_hello():
print("Привет, Airflow!")
dag = DAG('example_dag', description='Пример DAG', schedule_interval='@daily', start_date=datetime(2023, 1, 1))
task = PythonOperator(task_id='print_hello_task', python_callable=print_hello, dag=dag)

def print_message(message):
print(message)
task = PythonOperator(task_id='print_message_task', python_callable=print_message, op_kwargs={'message': 'Динамическое сообщение'}, dag=dag)

Для работы с зависимостями между задачами используйте метод set_upstream или оператор >>. Например, чтобы задача task_b выполнялась после task_a, добавьте:

task_a >> task_b

Если ваш код требует внешних зависимостей, убедитесь, что они установлены в среде выполнения Airflow. Для этого можно использовать виртуальное окружение или добавить зависимости в файл requirements.txt.

Используйте PythonOperator для выполнения сложных задач, таких как обработка данных, вызов API или работа с базами данных. Однако для задач, связанных с выполнением внешних команд или скриптов, лучше подойдёт BashOperator.

Создание пользовательского Python-оператора

Определите класс, который наследует BaseOperator из модуля airflow.models. Это основа для создания оператора. Внутри класса переопределите метод execute, который будет выполнять основную логику. Например:

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class CustomPythonOperator(BaseOperator):
@apply_defaults
def __init__(self, custom_param, *args, **kwargs):
super().__init__(*args, kwargs)
self.custom_param = custom_param
def execute(self, context):
print(f"Выполнение с параметром: {self.custom_param}")

Используйте декоратор @apply_defaults, чтобы автоматически применять стандартные параметры Airflow. Добавьте необходимые аргументы в метод __init__, чтобы передавать данные в оператор.

Для использования оператора в DAG, импортируйте его и добавьте в задачу. Укажите параметры, которые требуются для работы:

from airflow import DAG
from datetime import datetime
with DAG('custom_operator_example', start_date=datetime(2023, 1, 1)) as dag:
task = CustomPythonOperator(
task_id='custom_task',
custom_param='Пример значения'
)

Если оператор требует взаимодействия с внешними системами, добавьте соответствующие библиотеки в метод execute. Например, для работы с API используйте requests:

import requests
def execute(self, context):
response = requests.get('https://api.example.com/data')
print(response.json())

Сохраняйте код оператора в отдельном модуле, чтобы упростить его повторное использование. Импортируйте модуль в DAG, когда это необходимо. Это упрощает поддержку и масштабирование проекта.

Определение PythonOperator и его настройки

PythonOperator позволяет выполнять произвольный Python-код в рамках задачи Apache Airflow. Используйте его, когда нужно запустить функцию, которая не требует сложных зависимостей или внешних ресурсов. Для настройки оператора передайте функцию в параметр python_callable, а аргументы функции – в op_kwargs или op_args.

Пример создания задачи с PythonOperator:


from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_message(message):
print(message)
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 1, 1),
}
with DAG('example_dag', default_args=default_args, schedule_interval='@daily') as dag:
task = PythonOperator(
task_id='print_message_task',
python_callable=print_message,
op_kwargs={'message': 'Привет, Airflow!'},
)

Убедитесь, что функция, передаваемая в python_callable, не содержит долгих операций или блокирующих вызовов. Для таких случаев лучше использовать специализированные операторы, например, BashOperator или KubernetesPodOperator.

PythonOperator поддерживает передачу контекста задачи через параметр provide_context=True. Это полезно, если нужно получить доступ к переменным, таким как execution_date или task_instance. Включите параметр kwargs в сигнатуру функции, чтобы использовать контекст.

Проверяйте логи выполнения задачи для отладки. Если функция завершается с ошибкой, Airflow автоматически отметит задачу как неудачную. Используйте try-except в коде функции для обработки исключений и логирования полезной информации.

Добавление зависимостей и импорт необходимых библиотек

Убедитесь, что все необходимые библиотеки установлены в среде выполнения вашего DAG. Для этого добавьте зависимости в файл requirements.txt или используйте виртуальное окружение. Например, для работы с Pandas и Requests укажите: pandas==2.0.3 и requests==2.31.0.

В начале вашего Python-скрипта импортируйте нужные модули. Для базовых задач Airflow достаточно стандартных импортов: from airflow import DAG и from airflow.operators.python_operator import PythonOperator. Если вы работаете с внешними API, добавьте import requests, а для обработки данных – import pandas as pd.

Если ваши задачи требуют специфических библиотек, таких как NumPy или SQLAlchemy, убедитесь, что они доступны в вашей среде. Для этого выполните команду pip install -r requirements.txt перед запуском DAG.

Проверьте совместимость версий библиотек, чтобы избежать конфликтов. Например, если вы используете Airflow 2.6, убедитесь, что все зависимости поддерживают эту версию. Для этого воспользуйтесь документацией библиотек или инструментами, такими как pipdeptree.

Если вы работаете в распределенной среде, убедитесь, что все worker-узлы имеют одинаковые версии библиотек. Это поможет избежать ошибок при выполнении задач на разных узлах.

Настройка параметров выполнения задачи

Укажите параметр retries в объекте задачи, чтобы определить количество повторных попыток в случае сбоя. Например, retries=3 позволяет задаче повториться три раза перед окончательным завершением с ошибкой.

Используйте retry_delay для задания интервала между повторными попытками. Это помогает избежать мгновенных повторных запусков, которые могут перегрузить систему. Пример:

  • retry_delay=timedelta(minutes=5) – повтор через 5 минут.

Настройте параметр execution_timeout, чтобы ограничить время выполнения задачи. Если задача превышает указанное время, она автоматически завершается. Пример:

  • execution_timeout=timedelta(hours=2) – максимальное время выполнения 2 часа.

Для управления зависимостями между задачами используйте depends_on_past. Если установить значение True, задача будет выполняться только при успешном завершении предыдущего запуска.

Добавьте параметр trigger_rule, чтобы изменить стандартное поведение запуска задачи. Например, trigger_rule='all_done' позволяет задаче запуститься, даже если предыдущие задачи завершились с ошибкой.

Укажите pool для распределения задач между пулами ресурсов. Это полезно, если у вас ограниченные ресурсы или задачи с разным уровнем приоритета. Пример:

  • pool='high_priority_pool' – выполнение задачи в пуле с высоким приоритетом.

Используйте priority_weight для управления очередностью выполнения задач. Задачи с большим значением этого параметра выполняются раньше. Пример:

  • priority_weight=10 – высокий приоритет задачи.

Добавьте sla для отслеживания времени выполнения задачи. Если задача превышает указанное время, в логах фиксируется нарушение SLA. Пример:

  • sla=timedelta(minutes=30) – максимальное время выполнения 30 минут.

Настройте on_failure_callback для выполнения определенных действий при сбое задачи. Это может быть отправка уведомления или логирование ошибки. Пример:

  • on_failure_callback=send_alert – вызов функции send_alert при ошибке.

Используйте on_success_callback для выполнения действий при успешном завершении задачи. Например, обновление статуса в базе данных или отправка отчета.

Основные практические примеры использования Python-оператора

Используйте PythonOperator для выполнения задач, которые требуют гибкости и логики, например, обработки данных или взаимодействия с внешними API. Например, для извлечения данных из базы данных и их последующей обработки:


from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
def process_data():
# Логика обработки данных
print("Данные успешно обработаны")
dag = DAG('example_dag', start_date=datetime(2023, 1, 1))
task = PythonOperator(
task_id='process_data_task',
python_callable=process_data,
dag=dag
)

Для взаимодействия с внешними API создайте функцию, которая отправляет запрос и обрабатывает ответ. Например, для получения данных о погоде:


import requests
def fetch_weather():
response = requests.get('https://api.weatherapi.com/v1/current.json?key=your_key&q=Moscow')
data = response.json()
print(f"Текущая температура в Москве: {data['current']['temp_c']}°C")
task = PythonOperator(
task_id='fetch_weather_task',
python_callable=fetch_weather,
dag=dag
)

PythonOperator также удобен для выполнения задач, связанных с машинным обучением. Например, для загрузки модели и предсказания:


import joblib
def predict():
model = joblib.load('model.pkl')
prediction = model.predict([[1, 2, 3]])
print(f"Предсказание: {prediction}")
task = PythonOperator(
task_id='predict_task',
python_callable=predict,
dag=dag
)

Для работы с файлами, например, для чтения и записи данных, используйте стандартные методы Python. Пример для чтения CSV-файла:


import pandas as pd
def read_csv():
data = pd.read_csv('data.csv')
print(data.head())
task = PythonOperator(
task_id='read_csv_task',
python_callable=read_csv,
dag=dag
)

PythonOperator позволяет интегрировать любую логику, которая может быть реализована на Python, что делает его универсальным инструментом в Airflow.

Простой пример: Выполнение функции в Python-операторе

def print_message():
print("Задача успешно выполнена!")

Используйте оператор PythonOperator, чтобы интегрировать эту функцию в DAG. Укажите функцию в параметре python_callable:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 1),
}
with DAG('example_dag', default_args=default_args, schedule_interval='@daily') as dag:
task = PythonOperator(
task_id='print_message_task',
python_callable=print_message
)

Если функция требует аргументов, передайте их через параметр op_kwargs. Например:

def custom_message(text):
print(f"Сообщение: {text}")
task_with_args = PythonOperator(
task_id='custom_message_task',
python_callable=custom_message,
op_kwargs={'text': 'Привет, Airflow!'}
)

Этот подход позволяет гибко настраивать задачи, используя Python-функции с минимальными усилиями.

Сложный пример: Чтение и запись данных с использованием внешних API

Для работы с внешними API в Apache Airflow создайте кастомный оператор, который будет выполнять HTTP-запросы и обрабатывать ответы. Начните с импорта необходимых библиотек: requests для HTTP-запросов и BaseOperator для создания оператора.


from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import requests
class APIDataOperator(BaseOperator):
@apply_defaults
def __init__(self, api_url, method='GET', headers=None, params=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.api_url = api_url
self.method = method
self.headers = headers
self.params = params
def execute(self, context):
response = requests.request(
method=self.method,
url=self.api_url,
headers=self.headers,
params=self.params
)
if response.status_code == 200:
return response.json()
else:
raise ValueError(f"Ошибка при запросе: {response.status_code}")

Используйте этот оператор в DAG для получения данных. Например, запросите данные с API погоды и сохраните их в XCom для дальнейшего использования.


from airflow import DAG
from datetime import datetime
from operators.api_data_operator import APIDataOperator
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 1),
}
with DAG('weather_data_dag', default_args=default_args, schedule_interval='@daily') as dag:
fetch_weather = APIDataOperator(
task_id='fetch_weather',
api_url='https://api.weatherapi.com/v1/current.json',
params={'key': 'your_api_key', 'q': 'Moscow'},
method='GET'
)

Для записи данных в другой API создайте аналогичный оператор, который будет отправлять POST-запросы. Например, отправьте данные в сервис для хранения логов.


class PostDataOperator(BaseOperator):
@apply_defaults
def __init__(self, api_url, data, headers=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.api_url = api_url
self.data = data
self.headers = headers
def execute(self, context):
response = requests.post(
url=self.api_url,
json=self.data,
headers=self.headers
)
if response.status_code != 200:
raise ValueError(f"Ошибка при отправке данных: {response.status_code}")

Добавьте этот оператор в DAG для отправки данных, полученных на предыдущем шаге.


send_logs = PostDataOperator(
task_id='send_logs',
api_url='https://logservice.com/api/logs',
data="{{ ti.xcom_pull(task_ids='fetch_weather') }}",
headers={'Content-Type': 'application/json'}
)
fetch_weather >> send_logs

Для обработки ошибок добавьте логику повторных попыток и уведомлений. Используйте параметры retries и retry_delay в DAG, а также настройте уведомления через on_failure_callback.


default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 10, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
'on_failure_callback': notify_failure
}

Этот подход позволяет гибко работать с внешними API, обрабатывать данные и интегрировать их в ваши рабочие процессы Airflow.

Отладка и мониторинг выполнения задач с PythonOperator

Для отладки задач, выполняемых через PythonOperator, используйте логирование с помощью модуля logging. Это позволяет отслеживать состояние выполнения задачи и выявлять ошибки. Добавьте следующие строки в ваш Python-скрипт:

import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def my_task():
logger.info("Задача начата")
# Ваш код
logger.info("Задача завершена")

Логи можно просматривать в интерфейсе Airflow или через командную строку с помощью команды airflow tasks logs.

Для мониторинга выполнения задач используйте встроенные инструменты Airflow:

  • DAG Runs: Проверяйте статус выполнения DAG на странице «DAG Runs» в веб-интерфейсе.
  • Task Instances: Отслеживайте состояние отдельных задач на вкладке «Task Instances».
  • Граф выполнения: Визуализируйте выполнение задач на странице «Graph View».

Если задача завершается с ошибкой, проверьте её стектрейс в интерфейсе Airflow. Это поможет быстро определить причину сбоя. Для более детального анализа используйте параметр retries в PythonOperator, чтобы задача автоматически перезапускалась при возникновении ошибки:

task = PythonOperator(
task_id='my_task',
python_callable=my_task,
retries=3,
retry_delay=timedelta(minutes=5),
dag=dag
)

Для анализа производительности задач используйте метрики Airflow, такие как время выполнения задачи и количество успешных/неудачных попыток. Эти данные доступны в разделе «Metrics» или через интеграцию с внешними системами мониторинга, например, Prometheus.

Если задача выполняется слишком долго, проверьте её код на наличие узких мест. Используйте профилирование с помощью модуля cProfile или библиотеки line_profiler для анализа времени выполнения отдельных строк кода.

Понравилась статья? Поделить с друзьями:
0 0 голоса
Рейтинг статьи
Подписаться
Уведомить о
guest

0 комментариев
Старые
Новые Популярные
Межтекстовые Отзывы
Посмотреть все комментарии