A DAG Run is a run for a particular dag at a particular time. For example,
if your DAG were called FOO and if its schedule interval were @hourly, then
you would have DAG runs : Foo @ noon, Foo @ 1p, Foo @ 2p, etc...

If you need to seed a table prior to each run, just add a task at the start
of your DAG to load the target table. You can do this simply by using a
PythonOperator, specifying your own custom Python callable -- your callable
would insert the necessary data.

I would warn that it is possible for Airflow to schedule DAG runs for FOO
in parallel, unless you specify, depends_on_past=True. You should probably
structure your table to be keyed by the dag run so that you could run
multiple DAG runs concurrently (i.e. depends_on_past=False).

-s

On Tue, Nov 1, 2016 at 7:09 AM, Michael Gong <go...@hotmail.com> wrote:

> Hi,
>
> I have a MySQL table, which will be stored some static information. The
> information could be different for different airflow runs, so I hope to use
> python code to initialize it whenever airflow starts.
>
>
> Where is the best place to put such code ?
>
>
> Is the class DagBag's __init__() a good candidate ?
>
>
> Please advise.
>
>
> Thanks.
>
>
> #############################################
>
> class DagBag(LoggingMixin):
>     """
>     A dagbag is a collection of dags, parsed out of a folder tree and has
> high
>     level configuration settings, like what database to use as a backend
> and
>     what executor to use to fire off tasks. This makes it easier to run
>     distinct environments for say production and development, tests, or for
>     different teams or security profiles. What would have been system level
>     settings are now dagbag level so that one system can run multiple,
>     independent settings sets.
>
>     :param dag_folder: the folder to scan to find DAGs
>     :type dag_folder: str
>     :param executor: the executor to use when executing task instances
>         in this DagBag
>     :param include_examples: whether to include the examples that ship
>         with airflow or not
>     :type include_examples: bool
>     :param sync_to_db: whether to sync the properties of the DAGs to
>         the metadata DB while finding them, typically should be done
>         by the scheduler job only
>     :type sync_to_db: bool
>     """
>     def __init__(
>             self,
>             dag_folder=None,
>             executor=DEFAULT_EXECUTOR,
>             include_examples=configuration.getboolean('core',
> 'LOAD_EXAMPLES'),
>             sync_to_db=False):
>
>         dag_folder = dag_folder or DAGS_FOLDER
>         self.logger.info("Filling up the DagBag from
> {}".format(dag_folder))
>         self.dag_folder = dag_folder
>         self.dags = {}
>         self.sync_to_db = sync_to_db
>         self.file_last_changed = {}
>         self.executor = executor
>         self.import_errors = {}
>         if include_examples:
>             example_dag_folder = os.path.join(
>
> ...
>
> #############################
>
>

Reply via email to