Hi,
I also have this kind of use case. We need to generate reports for many
users where the selected users comes out from where specific queries.
I did something like this:
[image: Inline image 1]
With the relevant DAG definition:
for i in range(0, int(Variable.get('weekly-mailer-worker', '5'))):
send_email = ExecuteFannedOutOperator(
task_id='execute-%s-%s' % (task_id, i),
dag=dag,
tolerance=100
)
start_weekly_mailer >> send_email
send_email >> post_execute
start-weekly-mailer will run the queries, and then send task definition to
queue (we used SQS). The execute-weekly-mailer will pull the task
definition from the queue, and will keep running until the queue is empty.
The post-execute mostly for cleanups.
If you see the DAG definition, I uses Variable, so I can increase the
number of worker from webserver (I can't decrease already running task,
because, you know, this is a hack). I think you can also set the Variable
on the start-weekly-mailer, to define how many parallel worker will need to
do, e.g. based on the number of row results.
I hope we can improve airflow to also handle this kind of uses case.
--
*Adinata*
TOKI 2009
SMAN Plus Provinsi Riau 9th
13509022 - Informatika ITB 2009
Engineer - UrbanIndo.com
On Fri, Sep 9, 2016 at 6:12 AM, J C Lawrence <[email protected]> wrote:
> On Thu, 8 Sep 2016 14:04:58 -0700, Ben Tallman <[email protected]> wrote:
>
> > We have done this a lot, and the one issue is that every time the DAG
> > is evaluated (even during a run), the SQL will be re-run, and tasks
> > can vary. In fact, we had a select statement that actually marked
> > items as in process during select, and THAT was bad.
>
> Yeah, I'm keeping an eye on that.
>
> The problem I'm having however is that the DAGs are not getting
> populated with the tasks relevant to that specific scheduling run. Do
> you have this working under Airflow today?
>
> > We have moved to x number of tasks, and each one grabs a line from
> > the DB, and 0 to n of them can actually get skipped if they don't get
> > a line from the DB.
>
> Yeah, I just don't want to a) setup yet another DB/table for an
> interstitial process or b) to re-invent dispatch/locking against a DB
> yet again.
>
> -- JCL
>