Hi Andres, welcome to the community! A few related points: * I confirm, schedule_interval=None is the way to go * Are you running a scheduler? the scheduler is also in charge of starting tasks, even for externally triggered DagRun * There's an operator that can emit dag runs, typically these triggers would run on another DAG on a short `schedule_interval` (say 5 minutes) and at each run evaluates whether is should trigger a DagRun or not, and pass conf parameters if needed: https://airflow.incubator.apache.org/code.html#airflow.operators.TriggerDagRunOperator * you can also use the ORM to quick off new tasks programmatically (from airflow.models import DagRun) * you can also use the UI to create DagRuns, though since the `conf` is a Pickle, you can't really see what it's in there in the UI. Would be a nice feature to show `conf` in the UI
As to debugging your issue I'd just print `dag_run.conf` to see what's in there. My wetware bash interpreter doesn't know whether the quoting in your `-c` arg is ok or not... Max On Wed, Dec 14, 2016 at 1:23 PM, Andres Quiroz < [email protected]> wrote: > Hello, > > I am new to Airflow and the mailing list, so please feel free to > re-direct me to the appropriate channel if this is not the right one > for my questions. I will state the questions first, in order or > importance, first, and then elaborate below. Thanks in advance. > > 1) Is setting "schedule_interval" to None in a DAG definition and > using the "airflow trigger_dag" command from the CLI with the > corresponding dag_id the correct way to trigger a DAG manually, so > that all of its tasks are executed? If correct, are these conditions > sufficient or is something missing? > > 2) How do I access the conf parameter passed to the trigger_dag > command in a BashOperator? (i.e. airflow trigger_dag --conf > '{"key":"value"}' dag_id) > > Also, please note I am a relative Python newbie, so I would really > appreciate examples as opposed to just an explanation about the code. > Here are the details for the questions: > > 1) I've defined a DAG for a workflow that will ONLY be triggered > externally (i.e. manually). In other words, I do not intend to use the > scheduler to run the workflow, or set any sort of scheduling interval > for running or backfilling, etc. However, since the workflow is > complex and long-running, all of the other functionality that Airflow > provides, like initiating tasks with the correct dependencies and > starting concurrent tasks if possible, keeping track of the state of > each run in the database, etc., is desired. > > My basic understanding is that this is achievable by setting the > schedule_interval in the DAG object to None, and then using the > "airflow trigger_dag" command from the CLI. However, the observed > behavior when doing this is that the state for the DAG does go to > running (I can see it as "running" in the DAG runs page in the UI), > but none of the tasks are actually started (the task instances table > in the UI remains empty, and the state of the tasks doesn't change in > the graph view). This happens both when running my own DAG as well as > the example_trigger_target_dag.py example. > > The airflow configuration is set to use the LocalExecutor, and the > database has been initiated. The full CLI command is the following: > > airflow trigger_dag -r run1 -c '{"batch":"test1"}' batch_test > > Here are the main parts of the DAG definition .py file: > > default_args = { > 'owner': 'data-dev', > 'depends_on_past': False, > 'start_date': datetime(2016, 12, 13), > 'email': ['[email protected]'], > 'email_on_failure': False, > 'email_on_retry': False, > 'retries': 0, > 'retry_delay': timedelta(minutes=5), > } > > dag = DAG('batch_test', default_args=default_args, schedule_interval=None) > > bash_op = """ > echo 'Generating {{ dag_run.conf["batch"] }}_out' > touch {{ dag_run.conf['batch'] }}_out.csv > """ > > t1 = BashOperator( > task_id='generate_output', > bash_command=bash_op, > params={}, > dag=dag) > > 2) As you can see from the CLI command and code snippet, I am trying > to pass a "batch" parameter for the DAG run, following the > instructions in the documentation and some forum posts I have seen. > However, this method is not working, since the dag_run object is empty > (None) when the bash operator tries to access conf. > > I would also note that issues (1) and (2) are unrelated, because I > tried the trigger_dag command on a workflow that doesn't use the conf > parameter and on one that uses it, but handles the empty object case > (i.e. example_trigger_target_dag.py). Both of them failed to start any > tasks, but the latter did render the bash command, but with an empty > message, which meant the dag_run object was also empty. > > Thanks again and apologies for the lengthy message; I have been > working on this for some time without success. > > Regards, > > Andrés >
