Kengo Seki created AIRFLOW-1674: ----------------------------------- Summary: Fix TriggerDagRunOperator to pass execution_date to the target Key: AIRFLOW-1674 URL: https://issues.apache.org/jira/browse/AIRFLOW-1674 Project: Apache Airflow Issue Type: Bug Components: operators Reporter: Kengo Seki
I ran example_trigger_target_dag via example_trigger_controller_dag with SQLite as backend. example_trigger_target_dag has two tasks i.e. run_this and bash_task, but the former failed with the following error: {code} [2017-10-03 10:31:13,555] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 10:31:13,554] {models.py:1563} ERROR - 'NoneType' object has no attribute 'conf' [2017-10-03 10:31:13,555] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last): [2017-10-03 10:31:13,555] {base_task_runner.py:98} INFO - Subtask: File "/home/sekikn/dev/incubator-airflow/airflow/models.py", line 1461, in _run_raw_task [2017-10-03 10:31:13,555] {base_task_runner.py:98} INFO - Subtask: result = task_copy.execute(context=context) [2017-10-03 10:31:13,555] {base_task_runner.py:98} INFO - Subtask: File "/home/sekikn/dev/incubator-airflow/airflow/operators/python_operator.py", line 89, in execute [2017-10-03 10:31:13,555] {base_task_runner.py:98} INFO - Subtask: return_value = self.execute_callable() [2017-10-03 10:31:13,556] {base_task_runner.py:98} INFO - Subtask: File "/home/sekikn/dev/incubator-airflow/airflow/operators/python_operator.py", line 94, in execute_callable [2017-10-03 10:31:13,556] {base_task_runner.py:98} INFO - Subtask: return self.python_callable(*self.op_args, **self.op_kwargs) [2017-10-03 10:31:13,556] {base_task_runner.py:98} INFO - Subtask: File "/home/sekikn/dev/incubator-airflow/airflow/example_dags/example_trigger_target_dag.py", line 51, in run_this_func [2017-10-03 10:31:13,556] {base_task_runner.py:98} INFO - Subtask: print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message'])) [2017-10-03 10:31:13,557] {base_task_runner.py:98} INFO - Subtask: AttributeError: 'NoneType' object has no attribute 'conf' [2017-10-03 10:31:13,557] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 10:31:13,555] {models.py:1592} INFO - Marking task as FAILED. [2017-10-03 10:31:13,567] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 10:31:13,567] {models.py:1612} ERROR - 'NoneType' object has no attribute 'conf' [2017-10-03 10:31:13,567] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last): [2017-10-03 10:31:13,568] {base_task_runner.py:98} INFO - Subtask: File "/home/sekikn/.virtualenvs/a/bin/airflow", line 6, in <module> [2017-10-03 10:31:13,568] {base_task_runner.py:98} INFO - Subtask: exec(compile(open(__file__).read(), __file__, 'exec')) [2017-10-03 10:31:13,568] {base_task_runner.py:98} INFO - Subtask: File "/home/sekikn/dev/incubator-airflow/airflow/bin/airflow", line 27, in <module> [2017-10-03 10:31:13,568] {base_task_runner.py:98} INFO - Subtask: args.func(args) [2017-10-03 10:31:13,568] {base_task_runner.py:98} INFO - Subtask: File "/home/sekikn/dev/incubator-airflow/airflow/bin/cli.py", line 394, in run [2017-10-03 10:31:13,569] {base_task_runner.py:98} INFO - Subtask: pool=args.pool, [2017-10-03 10:31:13,569] {base_task_runner.py:98} INFO - Subtask: File "/home/sekikn/dev/incubator-airflow/airflow/utils/db.py", line 50, in wrapper [2017-10-03 10:31:13,569] {base_task_runner.py:98} INFO - Subtask: result = func(*args, **kwargs) [2017-10-03 10:31:13,569] {base_task_runner.py:98} INFO - Subtask: File "/home/sekikn/dev/incubator-airflow/airflow/models.py", line 1461, in _run_raw_task [2017-10-03 10:31:13,569] {base_task_runner.py:98} INFO - Subtask: result = task_copy.execute(context=context) [2017-10-03 10:31:13,569] {base_task_runner.py:98} INFO - Subtask: File "/home/sekikn/dev/incubator-airflow/airflow/operators/python_operator.py", line 89, in execute [2017-10-03 10:31:13,570] {base_task_runner.py:98} INFO - Subtask: return_value = self.execute_callable() [2017-10-03 10:31:13,570] {base_task_runner.py:98} INFO - Subtask: File "/home/sekikn/dev/incubator-airflow/airflow/operators/python_operator.py", line 94, in execute_callable [2017-10-03 10:31:13,570] {base_task_runner.py:98} INFO - Subtask: return self.python_callable(*self.op_args, **self.op_kwargs) [2017-10-03 10:31:13,570] {base_task_runner.py:98} INFO - Subtask: File "/home/sekikn/dev/incubator-airflow/airflow/example_dags/example_trigger_target_dag.py", line 51, in run_this_func [2017-10-03 10:31:13,570] {base_task_runner.py:98} INFO - Subtask: print("Remotely received value of {} for key=message".format(kwargs['dag_run'].conf['message'])) [2017-10-03 10:31:13,570] {base_task_runner.py:98} INFO - Subtask: AttributeError: 'NoneType' object has no attribute 'conf' {code} The latter seemed to succeed apparently, but the expected message was not passed in reality. {code} [2017-10-03 10:31:07,643] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 10:31:07,643] {bash_operator.py:95} INFO - Here is the message: {code} It seems to cause this problem that execution_dates are different between example_trigger_target_dag and its tasks. {code} sqlite> select dag_id, execution_date from dag_run where dag_id='example_trigger_target_dag'; example_trigger_target_dag|2017-10-03 14:31:00 sqlite> select task_id, execution_date from task_instance where dag_id='example_trigger_target_dag'; bash_task|2017-10-03 14:31:00.000000 run_this|2017-10-03 14:31:00.000000 {code} Passing execution_date to DAG#create_dagrun in TriggerDagRunOperator#execute will resolve this problem, as Justin shows in the following mail: https://lists.apache.org/thread.html/e369f7f7c3dbe0b673a785f939d5259b45da5cdb923cb3415aa7e410@%3Cdev.airflow.apache.org%3E -- This message was sent by Atlassian JIRA (v6.4.14#64029)