Mrhacktivist opened a new issue, #38623:
URL: https://github.com/apache/airflow/issues/38623

   ### Apache Airflow Provider(s)
   
   postgres
   
   ### Versions of Apache Airflow Providers
   
   5.5.1
   
   ### Apache Airflow version
   
   2.6.3
   
   ### Operating System
   
   amazon linux 2
   
   ### Deployment
   
   Amazon (AWS) MWAA
   
   ### Deployment details
   
   MWAA 2.6.3 
   
   ### What happened
   
   When we pass the predefined list of variables to the loop and iterate 
through it and pass the iterated value to the query in a function as a table 
name parameter, we can see that the query is returned in the task logs as -
   
   code example -
   ```
   from datetime import datetime
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.python_operator import PythonOperator
   from airflow.hooks.postgres_hook import PostgresHook
   
   #Define the function to execute SQL script using PostgresHook
   def run_stage_queries(table_name, **kwargs):
       print(table_name)
       print(records)
       load_type_res = pg_hook.get_records(
           "SELECT * FROM public.test WHERE ods_table_name = 
'{}'".format(table_name))
       print('Executing SQL query: {}'.format(load_type_res))
       print('&&&& ', load_type_res[0][0])
       load_type = load_type_res[0][0]
   
   #Define the DAG
   dag = DAG(
       dag_id="vaibhav",
       schedule_interval="@daily",
       start_date=datetime(2023, 1, 1),
       max_active_runs=1,
       catchup=False
   )
   
   #Fetching records within the context of the DAG
   pg_hook = PostgresHook(postgres_conn_id='red')
   
   sql_ods = "SELECT ods_table_name FROM public.test;"
   records = ["vaibhav","nikhil","kunal"]
   ods_table_list = []
   for o_table in records:
       if o_table is not None:
           ods_table_list.append(o_table)
   
   if len(ods_table_list) > 0:
       init_parameters = DummyOperator(task_id='Load_base_tables', dag=dag)
       stage_Load_Success = DummyOperator(task_id='Stage_Load_Success', dag=dag)
       for tableName in ods_table_list:
           load_base_stg = PythonOperator(
               task_id=tableName + '_stage',
               op_kwargs={'table_name': tableName},
               python_callable=run_stage_queries,
               provide_context=True,
               dag=dag)
           init_parameters >> load_base_stg
           load_base_stg >> stage_Load_Success
   
   ```
   As we can see in above example we have passed predefined list of varaible 
and it returned below output where sql query mentioned in run_stage_queries() 
method is getting printed in the logs by [sql.py 
](https://github.com/apache/airflow/blob/main/airflow/providers/common/sql/hooks/sql.py)
 and the method which is responsible for it is : 
   ```
   def _run_command(self, cur, sql_statement, parameters):
           """Run a statement using an already open cursor."""
           if self.log_sql:
               self.log.info("Running statement: %s, parameters: %s", 
sql_statement, parameters)
   
   ```
   Output:
   ```
   [2024-03-26, 12:15:37 UTC] {{sql.py:374}} INFO - Running statement: SELECT * 
FROM public.test WHERE ods_table_name = 'vaibhav', parameters: None
   [2024-03-26, 12:15:37 UTC] {{sql.py:383}} INFO - Rows affected: 1
   ```
   
   Issue -
   If we get the records from the query at top level like "
   sql_ods = "SELECT ods_table_name FROM public.test;"
   records = pg_hook.get_records(sql_ods)" and iterate through this "records" 
and store it in another variable and then gain iterate through this variable 
and create a dynamic task based on the variable name, also pass this variable 
name as table name to the run_stage_queries() then in that case the query is 
not returned in task log. Please find the code example and output below:
   
   code - 
   ```
   from datetime import datetime
   from airflow import DAG
   from airflow.operators.dummy_operator import DummyOperator
   from airflow.operators.python_operator import PythonOperator
   from airflow.hooks.postgres_hook import PostgresHook
   
   #Define the function to execute SQL script using PostgresHook
   def run_stage_queries(table_name, **kwargs):
       print(table_name)
       print(records)
       print(ods_table_list)
       load_type_res = pg_hook.get_records(
           "SELECT * FROM public.test WHERE ods_table_name = 
'{}'".format(table_name))
       print('Executing SQL query: {}'.format(load_type_res))
       print('&&&& ', load_type_res[0][0])
       load_type = load_type_res[0][0]
   
   #Define the DAG
   dag = DAG(
       dag_id="vaibhavvvvv",
       schedule_interval="@daily",
       start_date=datetime(2023, 1, 1),
       max_active_runs=1,
       catchup=False
   )
   
   #Fetching records within the context of the DAG
   pg_hook = PostgresHook(postgres_conn_id='red')
   
   sql_ods = "SELECT ods_table_name FROM public.test;"
   records = pg_hook.get_records(sql_ods)
   ods_table_list = [item[0] for item in records]
   
   
   if len(ods_table_list) > 0:
       init_parameters = DummyOperator(task_id='Load_base_tables', dag=dag)
       stage_Load_Success = DummyOperator(task_id='Stage_Load_Success', dag=dag)
       for tableName in ods_table_list:
           load_base_stg = PythonOperator(
               task_id=tableName + '_stage',
               op_kwargs={'table_name': tableName},
               python_callable=run_stage_queries,
               provide_context=True,
               dag=dag)
           init_parameters >> load_base_stg
           load_base_stg >> stage_Load_Success
   ```
   
   Output :
   ```
   [2024-03-26, 12:16:30 UTC] {{standard_task_runner.py:85}} INFO - Job 52: 
Subtask vaibhav_stage
   [2024-03-26, 12:16:30 UTC] {{taskinstance.py:1545}} INFO - Exporting env 
vars: AIRFLOW_CTX_DAG_OWNER='airflow' AIRFLOW_CTX_DAG_ID='vaibhavvvvv' 
AIRFLOW_CTX_TASK_ID='vaibhav_stage' 
AIRFLOW_CTX_EXECUTION_DATE='2024-03-26T12:16:14.873032+00:00' 
AIRFLOW_CTX_TRY_NUMBER='1' 
AIRFLOW_CTX_DAG_RUN_ID='manual__2024-03-26T12:16:14.873032+00:00'
   [2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - vaibhav
   [2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - [('vaibhav',), 
('nikhil',), ('kunal',)]
   [2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - ['vaibhav', 
'nikhil', 'kunal']
   [2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - Executing SQL 
query: [('vaibhav',)]
   [2024-03-26, 12:16:30 UTC] {{logging_mixin.py:150}} INFO - &&&&  vaibhav
   [2024-03-26, 12:16:30 UTC] {{python.py:183}} INFO - Done. Returned value 
was: None
   ```
   
   As we can see in above output the query is not returned in the task logs. 
Why the query is no returned in the logs when the dynamic list is pass?
   
   ### What you think should happen instead
   
   When we pass the dynamic list created by executing query and iterate through 
it and store it in variable, after passing the each single value from that 
variable to another query which is in method as a table name, it should return 
the "Running statement: "whatever the query is"" in task logs.
   
   ### How to reproduce
   
   Pass the dynamic list as below:
   
   ```
   pg_hook = PostgresHook(postgres_conn_id='red')
   
   sql_ods = "SELECT ods_table_name FROM public.test;"
   records = pg_hook.get_records(sql_ods)
   ods_table_list = [item[0] for item in records]
   ```
   
   dynamic task creation 
   
   ```
   if len(ods_table_list) > 0:
       init_parameters = DummyOperator(task_id='Load_base_tables', dag=dag)
       stage_Load_Success = DummyOperator(task_id='Stage_Load_Success', dag=dag)
       for tableName in ods_table_list:
           load_base_stg = PythonOperator(
               task_id=tableName + '_stage',
               op_kwargs={'table_name': tableName},
               python_callable=run_stage_queries,
               provide_context=True,
               dag=dag)
           init_parameters >> load_base_stg
           load_base_stg >> stage_Load_Success
   ```
   
   to the method -
   
   ```
   def run_stage_queries(table_name, **kwargs):
       print(table_name)
       print(records)
       print(ods_table_list)
       load_type_res = pg_hook.get_records(
           "SELECT * FROM public.test WHERE ods_table_name = 
'{}'".format(table_name))
       print('Executing SQL query: {}'.format(load_type_res))
       print('&&&& ', load_type_res[0][0])
       load_type = load_type_res[0][0]
   ```
   
   ### Anything else
   
   _No response_
   
   ### Are you willing to submit PR?
   
   - [ ] Yes I am willing to submit a PR!
   
   ### Code of Conduct
   
   - [X] I agree to follow this project's [Code of 
Conduct](https://github.com/apache/airflow/blob/main/CODE_OF_CONDUCT.md)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@airflow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to