This is an automated email from the ASF dual-hosted git repository. amoghdesai pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push: new 2a907ef08e5 Adding a task that validates format of dates in YDB example dag (#46807) 2a907ef08e5 is described below commit 2a907ef08e5c7146ca2b4f2090a31db12e164301 Author: Amogh Desai <amoghrajesh1...@gmail.com> AuthorDate: Mon Feb 17 00:33:49 2025 +0530 Adding a task that validates format of dates in YDB example dag (#46807) --- providers/ydb/tests/system/ydb/example_ydb.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/providers/ydb/tests/system/ydb/example_ydb.py b/providers/ydb/tests/system/ydb/example_ydb.py index cd2d4835d49..3526ca6cfa6 100644 --- a/providers/ydb/tests/system/ydb/example_ydb.py +++ b/providers/ydb/tests/system/ydb/example_ydb.py @@ -18,11 +18,13 @@ from __future__ import annotations import datetime import os +import re import ydb from airflow import DAG from airflow.decorators import task +from airflow.operators.python import PythonOperator from airflow.providers.ydb.hooks.ydb import YDBHook from airflow.providers.ydb.operators.ydb import YDBExecuteQueryOperator @@ -55,6 +57,19 @@ def populate_pet_table_via_bulk_upsert(): hook.bulk_upsert("pet", rows=rows, column_types=column_types) +def sanitize_date(value: str) -> str: + """Ensure the value is a valid date format""" + if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", value): + raise ValueError(f"Invalid date format: {value}") + return value + + +def transform_dates(**kwargs): + begin_date = sanitize_date(kwargs.get("begin_date")) + end_date = sanitize_date(kwargs.get("end_date")) + return {"begin_date": begin_date, "end_date": end_date} + + with DAG( dag_id=DAG_ID, start_date=datetime.datetime(2020, 2, 2), @@ -93,11 +108,18 @@ with DAG( # [START ydb_operator_howto_guide_get_all_pets] get_all_pets = YDBExecuteQueryOperator(task_id="get_all_pets", sql="SELECT * FROM pet;") # [END ydb_operator_howto_guide_get_all_pets] + transform_dates = PythonOperator( + task_id="transform_dates", + python_callable=transform_dates, + op_kwargs={"begin_date": "{{params.begin_date}}", "end_date": "{{params.end_date}}"}, + params={"begin_date": "2020-01-01", "end_date": "2020-12-31"}, + ) # [START ydb_operator_howto_guide_get_birth_date] get_birth_date = YDBExecuteQueryOperator( task_id="get_birth_date", - sql="SELECT * FROM pet WHERE birth_date BETWEEN '{{params.begin_date}}' AND '{{params.end_date}}'", - params={"begin_date": "2020-01-01", "end_date": "2020-12-31"}, + sql=""" + SELECT * FROM pet WHERE birth_date BETWEEN '{{ ti.xcom_pull(task_ids="transform_dates")["begin_date"] }}' AND '{{ ti.xcom_pull(task_ids="transform_dates")["end_date"] }}' + """, ) # [END ydb_operator_howto_guide_get_birth_date] @@ -106,6 +128,7 @@ with DAG( >> populate_pet_table >> populate_pet_table_via_bulk_upsert() >> get_all_pets + >> transform_dates >> get_birth_date ) # [END ydb_operator_howto_guide]