Also, you might wish to add a Pool so that only N tasks go against your
Postgres server at any one time.
We have pools for all of our resources.


On Thu, Sep 8, 2016 at 2:04 PM, Ben Tallman <[email protected]> wrote:

> We have done this a lot, and the one issue is that every time the DAG is
> evaluated (even during a run), the SQL will be re-run, and tasks can vary.
> In fact, we had a select statement that actually marked items as in process
> during select, and THAT was bad.
>
> We have moved to x number of tasks, and each one grabs a line from the DB,
> and 0 to n of them can actually get skipped if they don't get a line from
> the DB.
>
> To be clear, we would really like the DAG's tasks to be frozen at time of
> schedule, but that has not been our experience, and I believe will take a
> fairly major re-factor. Furthermore, I believe that the definition of a
> Dynamic Acyclic Graph is that it is re-evaluated during runtime and that
> the path is non-determinate at runtime.
>
>
> Thanks,
> Ben
>
> *--*
> *ben tallman* | *apigee
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM
> JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%
> 2F%2Fwww.apigee.com%2F&si=5141814536306688&pi=999a610c-
> 8298-4095-eefd-dfab06b90c1f>*
>  | m: +1.503.680.5709 | o: +1.503.608.7552 | twitter @anonymousmanage
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM
> JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%
> 2F%2Ftwitter.com%2Fanonymousmanage&si=5141814536306688&pi=999a610c-
> 8298-4095-eefd-dfab06b90c1f>
>  @apigee
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM
> JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=https%
> 3A%2F%2Ftwitter.com%2Fapigee&si=5141814536306688&pi=
> 999a610c-8298-4095-eefd-dfab06b90c1f>
> <http://t.sidekickopen06.com/e1t/c/5/f18dQhb0S7lC8dDMPbW2n0x6l2B9nM
> JW7t5XZs4WJfgqW4WJj7n3MP7VWW3LqXLC56dWRRf2H8CkP02?t=http%3A%
> 2F%2Fadapt.apigee.com%2F&si=5141814536306688&pi=999a610c-
> 8298-4095-eefd-dfab06b90c1f>
>
> On Thu, Sep 8, 2016 at 1:50 PM, J C Lawrence <[email protected]> wrote:
>
> > I have a few hundred thousand files arriving from an external service
> > each day and would like to ETL their contents into my store with
> > Airflow.  As the files are large and numerous and slow to process, I'd
> > also like to process them in parallel...so I thought something like
> > this:
> >
> >     def sub_dag (
> >         parent_dag_name,
> >         child_dag_name,
> >         start_date,
> >         schedule_interval):
> >       dag = DAG(
> >         "%s.%s" % (parent_dag_name, child_dag_name),
> >         schedule_interval = schedule_interval,
> >         start_date = start_date,
> >       )
> >       fan_out = operators.DummyOperator(
> >         task_id = "fan_out",
> >         dag = dag,
> >       )
> >       fan_in = operators.DummyOperator(
> >         task_id = "fan_in",
> >         dag = dag,
> >       )
> >       cur = hooks.PostgresHook ("MY_DB").get_cursor ()
> >       cur.execute ("""SELECT file_id
> >                      FROM some_table
> >                      WHERE something;""".format (foo = func(start_date))
> >       for rec in cur:
> >         fid = rec[0]
> >         o = operators.PythonOperator (
> >           task_id = "ImportThing__%s" % fid,
> >           provide_context = True,
> >           python_callable = import_func,
> >           params = {"file_id": fid,},
> >           dag = dag)
> >         o.set_upstream (fan_out)
> >         o.set_downstream (fan_in)
> >       cur.close ()
> >       return dag
> >
> > The idea being that the number and identity of the tasks in the sub-DAG
> > would vary dynamically depending on what day it was running for (ie
> > which what rows come back from the query for that day). But...no, this
> > doesn't seem to work.
> >
> > Any recommendations for how to approach this?
> >
> > -- JCL
> >
>



-- 
Lance Norskog
[email protected]
Redwood City, CA

Reply via email to