This is an automated email from the ASF dual-hosted git repository.

amoghdesai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 2a907ef08e5 Adding a task that validates format of dates in YDB 
example dag (#46807)
2a907ef08e5 is described below

commit 2a907ef08e5c7146ca2b4f2090a31db12e164301
Author: Amogh Desai <amoghrajesh1...@gmail.com>
AuthorDate: Mon Feb 17 00:33:49 2025 +0530

    Adding a task that validates format of dates in YDB example dag (#46807)
---
 providers/ydb/tests/system/ydb/example_ydb.py | 27 +++++++++++++++++++++++++--
 1 file changed, 25 insertions(+), 2 deletions(-)

diff --git a/providers/ydb/tests/system/ydb/example_ydb.py 
b/providers/ydb/tests/system/ydb/example_ydb.py
index cd2d4835d49..3526ca6cfa6 100644
--- a/providers/ydb/tests/system/ydb/example_ydb.py
+++ b/providers/ydb/tests/system/ydb/example_ydb.py
@@ -18,11 +18,13 @@ from __future__ import annotations
 
 import datetime
 import os
+import re
 
 import ydb
 
 from airflow import DAG
 from airflow.decorators import task
+from airflow.operators.python import PythonOperator
 from airflow.providers.ydb.hooks.ydb import YDBHook
 from airflow.providers.ydb.operators.ydb import YDBExecuteQueryOperator
 
@@ -55,6 +57,19 @@ def populate_pet_table_via_bulk_upsert():
     hook.bulk_upsert("pet", rows=rows, column_types=column_types)
 
 
+def sanitize_date(value: str) -> str:
+    """Ensure the value is a valid date format"""
+    if not re.fullmatch(r"\d{4}-\d{2}-\d{2}", value):
+        raise ValueError(f"Invalid date format: {value}")
+    return value
+
+
+def transform_dates(**kwargs):
+    begin_date = sanitize_date(kwargs.get("begin_date"))
+    end_date = sanitize_date(kwargs.get("end_date"))
+    return {"begin_date": begin_date, "end_date": end_date}
+
+
 with DAG(
     dag_id=DAG_ID,
     start_date=datetime.datetime(2020, 2, 2),
@@ -93,11 +108,18 @@ with DAG(
     # [START ydb_operator_howto_guide_get_all_pets]
     get_all_pets = YDBExecuteQueryOperator(task_id="get_all_pets", sql="SELECT 
* FROM pet;")
     # [END ydb_operator_howto_guide_get_all_pets]
+    transform_dates = PythonOperator(
+        task_id="transform_dates",
+        python_callable=transform_dates,
+        op_kwargs={"begin_date": "{{params.begin_date}}", "end_date": 
"{{params.end_date}}"},
+        params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
+    )
     # [START ydb_operator_howto_guide_get_birth_date]
     get_birth_date = YDBExecuteQueryOperator(
         task_id="get_birth_date",
-        sql="SELECT * FROM pet WHERE birth_date BETWEEN 
'{{params.begin_date}}' AND '{{params.end_date}}'",
-        params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
+        sql="""
+        SELECT * FROM pet WHERE birth_date BETWEEN '{{ 
ti.xcom_pull(task_ids="transform_dates")["begin_date"] }}' AND '{{ 
ti.xcom_pull(task_ids="transform_dates")["end_date"] }}'
+        """,
     )
     # [END ydb_operator_howto_guide_get_birth_date]
 
@@ -106,6 +128,7 @@ with DAG(
         >> populate_pet_table
         >> populate_pet_table_via_bulk_upsert()
         >> get_all_pets
+        >> transform_dates
         >> get_birth_date
     )
     # [END ydb_operator_howto_guide]

Reply via email to