Hi Andres, welcome to the community!

A few related points:
* I confirm, schedule_interval=None is the way to go
* Are you running a scheduler? the scheduler is also in charge of starting
tasks, even for externally triggered DagRun
* There's an operator that can emit dag runs, typically these triggers
would run on another DAG on a short `schedule_interval` (say 5 minutes) and
at each run evaluates whether is should trigger a DagRun or not, and pass
conf parameters if needed:
https://airflow.incubator.apache.org/code.html#airflow.operators.TriggerDagRunOperator
* you can also use the ORM to quick off new tasks programmatically (from
airflow.models import DagRun)
* you can also use the UI to create DagRuns, though since the `conf` is a
Pickle, you can't really see what it's in there in the UI. Would be a nice
feature to show `conf` in the UI

As to debugging your issue I'd just print `dag_run.conf` to see what's in
there. My wetware bash interpreter doesn't know whether the quoting in your
`-c` arg is ok or not...

Max

On Wed, Dec 14, 2016 at 1:23 PM, Andres Quiroz <
[email protected]> wrote:

> Hello,
>
> I am new to Airflow and the mailing list, so please feel free to
> re-direct me to the appropriate channel if this is not the right one
> for my questions. I will state the questions first, in order or
> importance, first, and then elaborate below. Thanks in advance.
>
> 1) Is setting "schedule_interval" to None in a DAG definition and
> using the "airflow trigger_dag" command from the CLI with the
> corresponding dag_id the correct way to trigger a DAG manually, so
> that all of its tasks are executed? If correct, are these conditions
> sufficient or is something missing?
>
> 2) How do I access the conf parameter passed to the trigger_dag
> command in a BashOperator? (i.e. airflow trigger_dag --conf
> '{"key":"value"}' dag_id)
>
> Also, please note I am a relative Python newbie, so I would really
> appreciate examples as opposed to just an explanation about the code.
> Here are the details for the questions:
>
> 1) I've defined a DAG for a workflow that will ONLY be triggered
> externally (i.e. manually). In other words, I do not intend to use the
> scheduler to run the workflow, or set any sort of scheduling interval
> for running or backfilling, etc. However, since the workflow is
> complex and long-running, all of the other functionality that Airflow
> provides, like initiating tasks with the correct dependencies and
> starting concurrent tasks if possible, keeping track of the state of
> each run in the database, etc., is desired.
>
> My basic understanding is that this is achievable by setting the
> schedule_interval in the DAG object to None, and then using the
> "airflow trigger_dag" command from the CLI. However, the observed
> behavior when doing this is that the state for the DAG does go to
> running (I can see it as "running" in the DAG runs page in the UI),
> but none of the tasks are actually started (the task instances table
> in the UI remains empty, and the state of the tasks doesn't change in
> the graph view). This happens both when running my own DAG as well as
> the example_trigger_target_dag.py example.
>
> The airflow configuration is set to use the LocalExecutor, and the
> database has been initiated. The full CLI command is the following:
>
> airflow trigger_dag -r run1 -c '{"batch":"test1"}' batch_test
>
> Here are the main parts of the DAG definition .py file:
>
> default_args = {
>     'owner': 'data-dev',
>     'depends_on_past': False,
>     'start_date': datetime(2016, 12, 13),
>     'email': ['[email protected]'],
>     'email_on_failure': False,
>     'email_on_retry': False,
>     'retries': 0,
>     'retry_delay': timedelta(minutes=5),
> }
>
> dag = DAG('batch_test', default_args=default_args, schedule_interval=None)
>
> bash_op = """
> echo 'Generating {{ dag_run.conf["batch"] }}_out'
> touch {{ dag_run.conf['batch'] }}_out.csv
> """
>
> t1 = BashOperator(
>     task_id='generate_output',
>     bash_command=bash_op,
>     params={},
>     dag=dag)
>
> 2) As you can see from the CLI command and code snippet, I am trying
> to pass a "batch" parameter for the DAG run, following the
> instructions in the documentation and some forum posts I have seen.
> However, this method is not working, since the dag_run object is empty
> (None) when the bash operator tries to access conf.
>
> I would also note that issues (1) and (2) are unrelated, because I
> tried the trigger_dag command on a workflow that doesn't use the conf
> parameter and on one that uses it, but handles the empty object case
> (i.e. example_trigger_target_dag.py). Both of them failed to start any
> tasks, but the latter did render the bash command, but with an empty
> message, which meant the dag_run object was also empty.
>
> Thanks again and apologies for the lengthy message; I have been
> working on this for some time without success.
>
> Regards,
>
> Andrés
>

Reply via email to