Re: Basic modeling question

2018-08-08 Thread Andy Cooper
To expand on Taylor's idea

I recently wrote a ScheduleBlackoutSensor that would allow you to prevent a
task from running if it meets the criteria provided. It accepts an array of
args for any number of the criteria so you could leverage this sensor to
provide "blackout" runs for a range of days of the week.

https://github.com/apache/incubator-airflow/pull/3702/files

For example,

task = ScheduleBlackoutSensor(day_of_week=[0,1,2,3,4,5], dag=dag)

Would prevent a task from running Monday - Saturday, allowing it to run on
Sunday.

You could leverage this Sensor as you would any other sensor or you could
invert the logic so that you would only need to specify

task = ScheduleBlackoutSensor(day_of_week=6, dag=dag)

To "whitelist" a task to run on Sundays.


Let me know if you have any questions

On Wed, Aug 8, 2018 at 1:47 PM Taylor Edmiston  wrote:

> Gabriel -
>
> One approach I've seen for a similar use case is to have multiple related
> rollups in one DAG that runs daily, then have the non-daily tasks skip most
> of the time (e.g., weekly only actually executes on Sundays and is
> parameterized to look at the last 7 days).
>
> You could implement that not running part a few ways, but one idea is a
> sensor in front of the weekly rollup task.  Imagine a SundaySensor like
> return
> execution_date.weekday() == 6.  One thing to keep in mind here is
> dependence on the DAG's cron schedule being more granular than the tasks.
>
> I think this could generalize into a DayOfWeekSensor / DayOfMonthSensor
> that would be nice to have.
>
> Of course this does mean some scheduler inefficiency on the skip days, but
> as long as those skips are fast and the overall number of tasks is small, I
> can accept that.
>
> *Taylor Edmiston*
> Blog  | CV
>  | LinkedIn
>  | AngelList
>  | Stack Overflow
> 
>
>
> On Wed, Aug 8, 2018 at 1:11 PM, Gabriel Silk 
> wrote:
>
> > Hello Airflow community,
> >
> > I have a basic question about how best to model a common data pipeline
> > pattern here at Dropbox.
> >
> > At Dropbox, all of our logs are ingested and written into Hive in hourly
> > and/or daily rollups. On top of this data we build many weekly and
> monthly
> > rollups, which typically run on a daily cadence and compute results over
> a
> > rolling window.
> >
> > If we have a metric X, it seems natural to put the daily, weekly, and
> > monthly rollups for metric X all in the same DAG.
> >
> > However, the different rollups have different dependency structures. The
> > daily job only depends on a single day partition, whereas the weekly job
> > depends on 7, the monthly on 28.
> >
> > In Airflow, it seems the two paradigms for modeling dependencies are:
> > 1) Depend on a *single run of a task* within the same DAG
> > 2) Depend on *multiple runs of task* by using an ExternalTaskSensor
> >
> > I'm not sure how I could possibly model this scenario using approach #1,
> > and I'm not sure approach #2 is the most elegant or performant way to
> model
> > this scenario.
> >
> > Any thoughts or suggestions?
> >
>


Re: Airflow - High Availability and Scale Up vs Scale Out

2018-06-08 Thread Andy Cooper
Once you solve DAG deployments and container orchestration, the
celeryExecutor becomes more interesting. We solve DAG deployments by
putting the DAG code into the container at build time and trigger image
updates on our Kubernetes cluster via webhooks with a private Docker
registry. We are currently using the CeleryExecutor to scale out vs up, but
have begun to explore the KubernetesExecutor to further simplify our stack.

It seems like you would want to go the route of separating concerns as well
if you want to move towards HA. More generally though, I don't think that
HA could be achieved with the current scheduler architecture requiring that
there is only one scheduler running at a time. The next best thing though
is to put the scheduler into container orchestration that will restart it
immediately on failure at which point it will continue to schedule work
where it left off.

- Andy Cooper

On Fri, Jun 8, 2018 at 7:24 AM Sam Sen  wrote:

> We are facing this now. We have tried the celeryexecutor and it adds more
> moving parts. While we have no thrown out this idea, we are going to give
> one big beefy box a try.
>
> To handle the HA side of things, we are putting the server in an
> auto-scaling group (we use AWS) with a min and Max of 1 server. We deploy
> from an AMI that has airflow baked in and we point the DB config to an RDS
> using service discovery (consul).
>
> As for the dag code, we can either bake it into the AMI as well or install
> it on bootup. We haven't decided what to do for this but either way, we
> realize it could take a few minutes to fully recover in the event of a
> catastrophe.
>
> The other option is to have a standby server if using celery isn't ideal.
> With that, I have tried using Hashicorp nomad to handle the services. In my
> limited trial, it did what we wanted but we need more time to test.
>
> On Fri, Jun 8, 2018, 4:23 AM Naik Kaxil  wrote:
>
> > Hi guys,
> >
> >
> >
> > I have 2 specific questions for the guys using Airflow in production?
> >
> >
> >
> >1. How have you achieved High availability? How does the architecture
> >look like? Do you replicate the master node as well?
> >2. Scale Up vs Scale Out?
> >   1. What is the preferred approach you take? 1 beefy Airflow VM with
> >   Worker, Scheduler and Webserver using Local Executor or a cluster
> with
> >   multiple workers using Celery Executor.
> >
> >
> >
> > I think this thread should help others as well with similar question.
> >
> >
> >
> >
> >
> > Regards,
> >
> > Kaxil
> >
> >
> >
> >
> > Kaxil Naik
> >
> > Data Reply
> > 2nd Floor, Nova South
> > 160 Victoria Street, Westminster
> > London SW1E 5LB - UK
> > phone: +44 (0)20 7730 6000 <+44%2020%207730%206000>
> > k.n...@reply.com
> > www.reply.com
> >
> > [image: Data Reply]
> >
>


Re: Capturing data changes that happen after the initial data pull

2018-06-06 Thread Andy Cooper
Are the endpoints to fetch data for the previous week the same as the
endpoints you use to fetch daily data (with only the date filter parameters
being different)?

On Wed, Jun 6, 2018 at 1:47 PM Pedro Machado  wrote:

> I am working with an API that provides daily data the day after the period
> completes. For example, 2018-06-01 data is available on 2018-06-02 at 12
> PM.
>
> I have a daily DAG that pulls this data and loads it into Redshift.
>
> The issue is that this data provider says that the data may be revised and
> it won't be finalized until the Tuesday after the end of the week.
>
> For example, for the week of 2018-05-27 through 2018-06-02, the data will
> be "final" on Tuesday 2018-06-05.
>
> I'd like to add another DAG that takes care of repulling the data for the
> previous week every Tuesday and I am wondering about the best way to
> implement this.
>
> Should I just develop another DAG that pulls one week at a time using the
> appropriate dates?
>
> Is there a way to leverage the existing daily DAG and have another dag
> trigger it with the appropriate execution date? If so, I suppose it would
> create new DAG runs. How will I be able to tell these new dag runs apart
> from the daily ones if they have the same execution dates?
>
> Thanks,
>
> Pedro
>


EmailSensorOperator

2018-05-30 Thread Andy Cooper
All,

I was recently answering a stackoverflow question that involved a workflow
depending on an email being received. My knee jerk reaction was to reply
that the user could leverage the EmailSensorOperator to accomplish this
task. I was typing out the answer - when I decided to link to the actual
operator. Much to my surprise I could not find it!

Is there a version of an IMAP/SMTP/EmailSensorOperator out there that I am
not able to locate? If not, is this something the community would be
interested in?

- Andy Cooper


Re: Airflow with Celery

2018-05-15 Thread Andy Cooper
I have had very similar issues when there was a problem with the connection
string pointing to the message broker. Triple check those connection
strings and attempt to connect outside of airflow.

On Tue, May 15, 2018 at 9:27 AM Goutham Pratapa 
wrote:

> Hi all,
>
> I have been using airflow with Celery executor in the background
>
> https://hastebin.com/sipecovomi.ini --> airflow.cfg
>
> https://hastebin.com/urutokuvoq.py   --> The dag I have been using
>
>
>
> This shows that the dag is always in running state.
>
>
>
>
> Airflow flower shows nothing in the tasks or in the broker.
>
>
> Did I miss anything can anyone help me in this regard.
>
>
> --
> Cheers !!!
> Goutham Pratapa
>


Re: Airflow Docker Container

2018-05-14 Thread Andy Cooper
As Scott pointed out, we at Astronomer.io have taken the lightweight image
philosophy to heart with our docker images. Our base image inherits from
Alpine
(https://github.com/astronomerio/astronomer) and our Airflow image (
https://github.com/astronomerio/astronomer/blob/master/docker/platform/airflow/Dockerfile)
layers on that. We then have another layer (
https://github.com/astronomerio/astronomer/blob/master/docker/platform/airflow/onbuild/Dockerfile)
which can be used to install system level packages and python dependencies
from packages.txt and requirements.txt.

If, for some reason, these images don't check all your boxes, let us know
and we'll see what we can do to make them better for you.

-Andy Cooper

On Mon, May 14, 2018 at 4:44 PM Joe Napolitano 
wrote:

> You may consider this base image we put together at Blue Apron. My fork
> fixes a build issue by pinning to pip < 10.
>
> https://github.com/joenap/airflow-base
>
> Joe Nap
>
> On Mon, May 14, 2018 at 4:37 PM, Daniel Imberman <
> daniel.imber...@gmail.com>
> wrote:
>
> > @Fokko
> >
> > I definitely agree with that. I think that having a "super lightweight"
> > image for just running a basic airflow instance makes sense. We could
> even
> > name the image something like  airflow-k8s so people know it's ONLY meant
> > to work in a k8s cluster. I'm trying to figure out what methods besides
> > helm we should be considering (Helm doesn't really have full saturation
> in
> > the k8s world so wanna see if there are other deployment tools we should
> > consider).
> >
> > @Scott Dang quite a bit is definitely an understatement :). Would anyone
> on
> > your team have some cycles to work with @jzucker or @sedwards on the
> > helm/deployment stuff?
> >
> > On Mon, May 14, 2018 at 1:18 PM Driesprong, Fokko 
> > wrote:
> >
> > > Hi Daniel,
> > >
> > > My dear colleague from GoDataDriven, Bas Harenslak, started on building
> > an
> > > official Docker container on the Dockerhub. I've put him in the CC. In
> > the
> > > end I strongly believe the image should end up in the official Docker
> > > repository: https://github.com/docker-library/official-images
> > >
> > > Right now, the excellent images provided by Puckel are widely used for
> > > running Airflow in Docker. For the Kubernetes build we need to pull in
> > some
> > > additional dependencies. Maybe a good idea to do this separately from
> the
> > > one from Puckel, to keep his images lightweight. Any thoughts?
> > >
> > > Kind regards,
> > > Fokko Driesprong
> > >
> > >
> > > 2018-05-14 22:09 GMT+02:00 Anirudh Ramanathan <
> > > ramanath...@google.com.invalid>:
> > >
> > >> @Erik Erlandson  has had conversations about
> publishing
> > >
> > >
> > >> docker images with the ASF Legal team.
> > >> Adding him to the thread.
> > >>
> > >> On Mon, May 14, 2018 at 1:07 PM Daniel Imberman <
> > >> daniel.imber...@gmail.com>
> > >> wrote:
> > >>
> > >> > Hi everyone,
> > >> >
> > >> > I've started looking into creating an official airflow docker
> > container
> > >> > s.t. users of the KubernetesExecutor could auto-pull from helm
> > >> > charts/deployment yamls/etc. I was wondering what everyone thinks
> the
> > >> best
> > >> > way to do this would be? Is there an official apache docker repo? Is
> > >> there
> > >> > a preferred linux distro?
> > >> >
> > >> > cc: @anirudh since this was something you had to deal with for
> > >> > spark-on-k8s.
> > >> >
> > >>
> > >>
> > >> --
> > >> Anirudh Ramanathan
> > >>
> > >
> >
>


Re: How to control the frequency that Airflow refreshes dag?

2017-12-05 Thread Andy Cooper
Agreed. We love the dynamic DAG generation ourselves. But as far as I know once 
a DAG is created (dynamically or statically) it is best not to change it 
without versioning the DAG and creating a new one. 

I will let others answer as to whether or not that is on the feature roadmap.

> On Dec 5, 2017, at 11:22 AM, Yifei Hong  wrote:
> 
> Andy, thanks much for the prompt response, this is helpful.
> 
> So scheduler_heartbeats_sec controls how frequently the scheduler will run
> and refresh the dags, right? Then in my case as we retrieve data and
> dynamically generate subdags, if the current run is still undergoing and
> external data source changes (i.e. from 3 run request to 5), I feel the dag
> refresh will cause problem and mess up the running dag. Will Airflow
> support this kind of usage? Dynamic dag generation is one of the Airflow
> features that I like the most.
> 
> On Tue, Dec 5, 2017 at 11:02 AM, Andy Cooper 
> wrote:
> 
>> Yifei,
>> 
>> This is because the DAG definition file is not stored in the database. The
>> scheduler must refresh the display of the DAG and it’s tasks from the DAG
>> file itself. This means that it will be refreshed every time the scheduler
>> is run. The schedule_interval refers to the schedule at which the DAG will
>> run, not at which it will be refreshed.
>> 
>> I would recommend not refreshing the task_ids and instead use static
>> task_ids to handle an array/ list of tasks.
>> 
>> 
>>> On Dec 5, 2017, at 10:58 AM, Yifei Hong  wrote:
>>> 
>>> Hi, Community,
>>> 
>>> We use the main dag in Airflow to retrieve data from a database, and then
>>> dynamically create a list of subdags for parallel runs with
>> LocalExecutor.
>>> For some task_id, we construct from now() to ensure the uniqueness.
>>> 
>>> One problem we encounter is that Airflow keeps refreshing the dag so that
>>> the task_id gets updated accordingly, even though we did not schedule the
>>> dag and only run it via backfill.
>>> 
>>> Could you please help advise how we can control the refresh frequency?
>> Why
>>> does Airflow not follow the scheduled frequency to refresh?
>>> 
>>> Many thanks in advance,
>>> Yifei
>> 
>> 



Re: How to control the frequency that Airflow refreshes dag?

2017-12-05 Thread Andy Cooper
Yifei,

This is because the DAG definition file is not stored in the database. The 
scheduler must refresh the display of the DAG and it’s tasks from the DAG file 
itself. This means that it will be refreshed every time the scheduler is run. 
The schedule_interval refers to the schedule at which the DAG will run, not at 
which it will be refreshed.

I would recommend not refreshing the task_ids and instead use static task_ids 
to handle an array/ list of tasks.


> On Dec 5, 2017, at 10:58 AM, Yifei Hong  wrote:
> 
> Hi, Community,
> 
> We use the main dag in Airflow to retrieve data from a database, and then
> dynamically create a list of subdags for parallel runs with LocalExecutor.
> For some task_id, we construct from now() to ensure the uniqueness.
> 
> One problem we encounter is that Airflow keeps refreshing the dag so that
> the task_id gets updated accordingly, even though we did not schedule the
> dag and only run it via backfill.
> 
> Could you please help advise how we can control the refresh frequency? Why
> does Airflow not follow the scheduled frequency to refresh?
> 
> Many thanks in advance,
> Yifei



INFO REQUEST: Scheduler Multiprocessing

2017-12-05 Thread Andy Cooper
When running Airflow with a single dynamic DAG file generating multiple DAGs 
Airflow the scheduler no longer forks processes on a per DAG basis and instead 
only forks on a per DAG file basis. We are trying to understand why this 
decision was made and if there are any suggestions on how to improve scheduler 
performance when an airflow instance contains one or more DAG files that 
generate many DAGs.

-Andy Cooper