Hi Maxime,

thanks for the link you posted.

The example provided in the documentation on dynamically adding DAGs does
not work for us - even bearing in mind your addendum you pointed to.

Our setup is as follows:

We deploy the webserver, scheduler and four workers (using the celery
executor) in one separate docker container each (all containers are
connected via the same Docker network).

The dag bag directory pointed to by `dags_folder` in airflow.cfg contains
one Python module that implements a DAG factory:

The DAG factory fetches DAG parameters from a database and instantiates the
appropriate DAG object (complete with all tasks).
Doing `globals()[dag_id] = dag_object` in the DAG factory code has no
effect i.e. the `dag_object` does not appear in our webserver and is not
executed.

The DAG factory code is executed in one of the four aforementioned workers.

I have attempted some DAG factory code that adds the DAG object to the
airflow database, however all I end up with are the corresponding `dag_id`s
listed in the webserver as "unavailable":

(... DAG factory class code ...)
@provide_session
    def _add_dag_to_airflow(self, dag, session=None):  # `dag` is the DAG
object instantiated by the DAG factory
        for attribute, default in [('is_subdag', False)]:
            dag = self._add_dag_attribute(dag, attribute, default)

        DAG.sync_to_db(dag, dag.owner, datetime.utcnow())
        dag_pickle = dag.pickle()

        dag_model = session.\
            query(DagModel).\
            filter(DagModel.dag_id == dag.dag_id).\
            first()
        dag_model.pickle_id = dag_pickle.id
        dag_model.is_paused = False

        session.merge(dag_model)
        session.commit()

        bag = DagBag('', include_examples=False)
        bag.bag_dag(dag, None, None)

        queue = []
        job = SchedulerJob(dag_id=dag.dag_id)
        job._process_dags(bag, [dag], queue)

        for dag_id, task_id, execution_date in queue:
            task = dag.get_task(task_id)

            task_instance = TaskInstance(task, execution_date)
            task_instance.state = State.SCHEDULED

            session.merge(task_instance)
            session.commit()



Any help on this workflow would be greatly appreciated!


Best,

Georg


On Tue, Nov 22, 2016 at 10:46 PM, Maxime Beauchemin <
maximebeauche...@gmail.com> wrote:

> I added a paragraph to the FAQ entry: "How can I create DAGs dynamically?"
>
> The GH link above points straight to that paragraph. It's related to an
> issue that was discussed on this mailing list last week.
>
> I agree that working examples are ideal, and they can help with test
> coverage as well.
>
> Max
>
> On Tue, Nov 22, 2016 at 1:10 PM, siddharth anand <san...@apache.org>
> wrote:
>
> > Hi Max,
> > Which part in the above PR is related to dynamic dags?
> >
> > When thinking about adding documentation about functionality, I propose
> the
> > community bias towards adding working examples and test coverage. We
> offer
> > a quick start (which by the way needs some updates - for example, why
> does
> > it not start airflow-scheduler after starting the webserver?), but then
> > folks get stuck in how to write DAGs and use the full range of Airflow
> > capabilities. This is where examples and better test coverage help keep
> > newbies productive.
> >
> > Perhaps the examples and tests can be upgraded to show a fuller set of
> > dynamic dag capabilities?
> >
> > -s
> >
> > On Mon, Nov 21, 2016 at 7:55 PM, Maxime Beauchemin <
> > maximebeauche...@gmail.com> wrote:
> >
> > > I just added a bit of information about dynamic DAG creation here:
> > > https://github.com/apache/incubator-airflow/pull/1889/files#diff-
> > > c6f0a0722c6a2f86277535d7bcec7f8cR162
> > >
> > > Let me know if it helps.
> > >
> > > Max
> > >
> > > On Mon, Nov 21, 2016 at 2:58 AM, Deepak Kumar Malladi <
> > > kapeed2...@gmail.com>
> > > wrote:
> > >
> > > > Hi,
> > > >
> > > > I want to dynamically create DAG during run time. I tried the snippet
> > > given
> > > > in the documentation. But it didnt work for me.
> > > >
> > > > Any pointer on how to trigger DAGs which aren't actually present in
> DAG
> > > > folder but are created through code execution (dynamically created)?
> > > >
> > > >
> > > > Thanks & Regards,
> > > > Deepak
> > > >
> > >
> >
>

Reply via email to