Don't you need to preserve the task objects? Your implementation overwrites each by the successor, so only the last task would be kept, despite your print statements. Try building a list or dict of tasks like:
tasks =[] #only at the top for file in glob('dags/snowsql/create/udf/*.sql'): print("FILE {}".format(file)) tasks.append( create_snowflake_operator(file, dag, 'snowflake_default') ) tasks[-1].set_upstream(start) On Fri, Sep 14, 2018 at 17:20 Frank Maritato <fmarit...@opentable.com.invalid> wrote: > Ok, my mistake. I thought that command was querying the server for its > information and not just looking in a directory relative to where it is > being run. I have it working now. Thanks Chris and Sai! > > > On 9/14/18, 9:58 AM, "Chris Palmer" <ch...@crpalmer.com> wrote: > > The relative paths might work from where ever you are evoking 'airflow > list_tasks', but that doesn't mean they work from wherever the > webserver is > parsing the dags from. > > Does running 'airflow list_tasks' from some other running directory > work? > > On Fri, Sep 14, 2018 at 12:35 PM Frank Maritato > <fmarit...@opentable.com.invalid> wrote: > > > Do you mean give the full path to the files? The relative path I'm > using > > definitely works. When I type airflow list_dags, I can see the > output from > > the print statements that the glob is finding my sql files and > creating the > > snowflake operators. > > > > airflow list_tasks workflow also lists all the operators I'm > creating. I'm > > just not seeing them in the ui. > > > > On 9/14/18, 9:10 AM, "Sai Phanindhra" <phani8...@gmail.com> wrote: > > > > Hi frank, > > Can you try giving global paths? > > > > On Fri 14 Sep, 2018, 21:35 Frank Maritato, < > fmarit...@opentable.com > > .invalid> > > wrote: > > > > > Hi, > > > > > > I'm using apache airflow 1.10.0 and I'm trying to dynamically > > generate > > > some tasks in my dag based on files that are in the dags > directory. > > The > > > problem is, I don't see these tasks in the ui, I just see the > > 'start' dummy > > > operator. If I type 'airflow list_tasks workflow', they are > listed. > > > Thoughts? > > > > > > Here is how I'm generating the tasks: > > > > > > > > > def create_snowflake_operator(file, dag, snowflake_connection): > > > file_repl = file.replace('/', '_') > > > file_repl = file_repl.replace('.sql', '') > > > print("TASK_ID {}".format(file_repl)) > > > return SnowflakeOperator( > > > dag=dag, > > > task_id='create_{}'.format(file_repl), > > > snowflake_conn_id=snowflake_connection, > > > sql=file > > > ) > > > > > > DAG_NAME = 'create_objects' > > > dag = DAG( > > > DAG_NAME, > > > default_args=args, > > > dagrun_timeout=timedelta(hours=2), > > > schedule_interval=None, > > > ) > > > > > > start = DummyOperator( > > > dag=dag, > > > task_id="start", > > > ) > > > > > > print("creating snowflake operators") > > > > > > for file in glob('dags/snowsql/create/udf/*.sql'): > > > print("FILE {}".format(file)) > > > task = create_snowflake_operator(file, dag, > 'snowflake_default') > > > task.set_upstream(start) > > > > > > for file in glob('dags/snowsql/create/table/*.sql'): > > > print("FILE {}".format(file)) > > > task = create_snowflake_operator(file, dag, > 'snowflake_default') > > > task.set_upstream(start) > > > > > > for file in glob('dags/snowsql/create/view/*.sql'): > > > print("FILE {}".format(file)) > > > task = create_snowflake_operator(file, dag, > 'snowflake_default') > > > task.set_upstream(start) > > > > > > print("done {}".format(start.downstream_task_ids)) > > > > > > Thanks in advance > > > -- > > > Frank Maritato > > > > > > > > > > > >