Mrhacktivist opened a new issue, #38623: URL: https://github.com/apache/airflow/issues/38623
### Apache Airflow Provider(s) postgres ### Versions of Apache Airflow Providers 5.5.1 ### Apache Airflow version 2.6.3 ### Operating System amazon linux 2 ### Deployment Amazon (AWS) MWAA ### Deployment details MWAA 2.6.3 ### What happened When we pass the predefined list of variables to the loop and iterate through it and pass the iterated value to the query in a function as a table name parameter, we can see that the query is returned in the task logs as - code example - ``` from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.hooks.postgres_hook import PostgresHook #Define the function to execute SQL script using PostgresHook def run_stage_queries(table_name, **kwargs): print(table_name) print(records) load_type_res = pg_hook.get_records( "SELECT * FROM public.test WHERE ods_table_name = '{}'".format(table_name)) print('Executing SQL query: {}'.format(load_type_res)) print('&&&& ', load_type_res[0][0]) load_type = load_type_res[0][0] #Define the DAG dag = DAG( dag_id="vaibhav", schedule_interval="@daily", start_date=datetime(2023, 1, 1), max_active_runs=1, catchup=False ) #Fetching records within the context of the DAG pg_hook = PostgresHook(postgres_conn_id='red') sql_ods = "SELECT ods_table_name FROM public.test;" records = ["vaibhav","nikhil","kunal"] ods_table_list = [] for o_table in records: if o_table is not None: ods_table_list.append(o_table) if len(ods_table_list) > 0: init_parameters = DummyOperator(task_id='Load_base_tables', dag=dag) stage_Load_Success = DummyOperator(task_id='Stage_Load_Success', dag=dag) for tableName in ods_table_list: load_base_stg = PythonOperator( task_id=tableName + '_stage', op_kwargs={'table_name': tableName}, python_callable=run_stage_queries, provide_context=True, dag=dag) init_parameters >> load_base_stg load_base_stg >> stage_Load_Success ``` As we can see in above example we have passed predefined list of varaible and it returned below output where sql query mentioned in run_stage_queries() method is getting printed in the logs by [sql.py ](https://github.com/apache/airflow/blob/main/airflow/providers/common/sql/hooks/sql.py) and the method which is responsible for it is : ``` def _run_command(self, cur, sql_statement, parameters): """Run a statement using an already open cursor.""" if self.log_sql: self.log.info("Running statement: %s, parameters: %s", sql_statement, parameters) ``` Output: ``` [2024-03-26, 12:15:37 UTC] {{sql.py:374}} INFO - Running statement: SELECT * FROM public.test WHERE ods_table_name = 'vaibhav', parameters: None [2024-03-26, 12:15:37 UTC] {{sql.py:383}} INFO - Rows affected: 1 ``` Issue - If we get the records from the query at top level like " sql_ods = "SELECT ods_table_name FROM public.test;" records = pg_hook.get_records(sql_ods)" and iterate through this "records" and store it in another variable and then gain iterate through this variable and create a dynamic task based on the variable name, also pass this variable name as table name to the run_stage_queries() then in that case the query is not returned in task log. Please find the code example and output below: code - ``` from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.hooks.postgres_hook import PostgresHook #Define the function to execute SQL script using PostgresHook def run_stage_queries(table_name, **kwargs): print(table_name) print(records) print(ods_table_list) load_type_res = pg_hook.get_records( "SELECT * FROM public.test WHERE ods_table_name = '{}'".format(table_name)) print('Executing SQL query: {}'.format(load_type_res)) print('&&&& ', load_type_res[0][0]) load_type = load_type_res[0][0] #Define the DAG dag = DAG( dag_id="vaibhavvvvv", schedule_interval="@daily", start_date=datetime(2023, 1, 1), max_active_runs=1, catchup=False ) #Fetching records within the context of the DAG pg_hook = PostgresHook(postgres_conn_id='red') sql_ods = "SELECT ods_table_name FROM public.test;" records = pg_hook.get_records(sql_ods) ods_table_list = [item[0] for item in records] if len(ods_table_list) > 0: init_parameters = DummyOperator(task_id='Load_base_tables', dag=dag) stage_Load_Success = DummyOperator(task_id='Stage_Load_Success', dag=dag) for tableName in ods_table_list: load_base_stg = PythonOperator( task_id=tableName + '_stage', op_kwargs={'table_name': tableName}, python_callable=run_stage_queries, provide_context=True, dag=dag) init_parameters >> load_base_stg load_base_stg >> stage_Load_Success ``` Output : ``` [2024-03-26, 12:16:30 UTC] {{standard_task_runner.py:85}} INFO - Job 52: Subtask vaibhav_stage [2024-03-26, 12:16:30 UTC] {{taskinstance.py:1545}} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='vaibhavvvvv' AIRFLOW_CTX_TASK_ID='vaibhav_stage' AIRFLOW_CTX_EXECUTION_DATE='2024-03-26T12:16:14.873032+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='manual__2024-03-26T12:16:14.873032+00:00' [2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - vaibhav [2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - [('vaibhav',), ('nikhil',), ('kunal',)] [2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - ['vaibhav', 'nikhil', 'kunal'] [2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - Executing SQL query: [('vaibhav',)] [2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - &&&& vaibhav [2024-03-26, 12:16:30 UTC] {{python.py:183}} INFO - Done. Returned value was: None ``` As we can see in above output the query is not returned in the task logs. Why the query is no returned in the logs when the dynamic list is pass? ### What you think should happen instead When we pass the dynamic list created by executing query and iterate through it and store it in variable, after passing the each single value from that variable to another query which is in method as a table name, it should return the "Running statement: "whatever the query is"" in task logs. ### How to reproduce Pass the dynamic list as below: ``` pg_hook = PostgresHook(postgres_conn_id='red') sql_ods = "SELECT ods_table_name FROM public.test;" records = pg_hook.get_records(sql_ods) ods_table_list = [item[0] for item in records] ``` dynamic task creation ``` if len(ods_table_list) > 0: init_parameters = DummyOperator(task_id='Load_base_tables', dag=dag) stage_Load_Success = DummyOperator(task_id='Stage_Load_Success', dag=dag) for tableName in ods_table_list: load_base_stg = PythonOperator( task_id=tableName + '_stage', op_kwargs={'table_name': tableName}, python_callable=run_stage_queries, provide_context=True, dag=dag) init_parameters >> load_base_stg load_base_stg >> stage_Load_Success ``` to the method - ``` def run_stage_queries(table_name, **kwargs): print(table_name) print(records) print(ods_table_list) load_type_res = pg_hook.get_records( "SELECT * FROM public.test WHERE ods_table_name = '{}'".format(table_name)) print('Executing SQL query: {}'.format(load_type_res)) print('&&&& ', load_type_res[0][0]) load_type = load_type_res[0][0] ``` ### Anything else _No response_ ### Are you willing to submit PR? - [ ] Yes I am willing to submit a PR! ### Code of Conduct - [X] I agree to follow this project's [Code of Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org