Airflow Scalability with Local Executor

2018-03-28 Thread ramandumcs
Hi All,
We have a use case to support 1000 concurrent DAGs. These dags would have have 
couple of Http task which would be submitting jobs to external services. Each 
DAG could run for couple of hours.
HTTP tasks are periodically checking(with sleep 20) the job status.
We tried running 1000 such dags(Parallelism set to 1000) with Airflow's 
LocalExecutor Mode but after 100 concurrent runs, tasks started failing due to
--> OOM error
--> Scheduler marked them failed because of lack of heartbeat.
We are using 4 cores and 16 GB RAM. Each airflow worker is taking ~250 MB of 
Virtual memory and ~60 MB of RES memory which seems to be on higher side. CPU 
utilisation is also ~98%.
Is there anything that can be done to optimise Memory/CPU for airflow worker.
Any pointer to airflow benchmarking with LocalExecutor would also be helpful


Cancel a Running dag

2018-04-12 Thread ramandumcs
Hi All,
We have a use case to cancel the already running DAG. So is there any 
recommended way to do so.

Thanks,
Raman


Benchmarking of Airflow Scheduler with Celery Executor

2018-04-12 Thread ramandumcs
Hi All,
We have requirement to run 10k(s) of concurrent tasks. We are exploring 
Airflow's Celery Executor for same. Horizontally Scaling of worker nodes seem 
possible but it can only have one active scheduler.
So will Airflow scheduler be able to handle these many concurrent tasks.
Is there any benchmarking number  around airflow scheduler's scalability.
Thanks,
Raman 
 



Re: Cancel a Running dag

2018-04-12 Thread ramandumcs
Thanks Laura,
We are using the CeleryExecutor. Just wondering if marking the TaskInstances as 
failed in metadata store would also work.
-Raman

On 2018/04/12 16:27:00, Laura Lorenz  wrote: 
> I use the CeleryExecutor and have used a mix of `celery control` and
> messaging queue purges to kill the running tasks and prevent them from
> being picked up by workers again (respectively), and doctor the DagRun to
> failed to stop the scheduler from repopulating the message. I think if you
> are using the Local or Sequential Executor you'd have to kill the scheduler
> process.
> 
> Laura
> 
> On Thu, Apr 12, 2018 at 12:05 PM, Taylor Edmiston 
> wrote:
> 
> > I don't think killing a currently running task is possible today.
> >
> > Of course you can pause it from the CLI or web UI so that future runs don't
> > get triggered, but it sounds like that's not what you're looking for.
> >
> > Best,
> > Taylor
> >
> > *Taylor Edmiston*
> > Blog  | Stack Overflow CV
> >  | LinkedIn
> >  | AngelList
> > 
> >
> >
> > On Thu, Apr 12, 2018 at 11:26 AM, ramandu...@gmail.com <
> > ramandu...@gmail.com
> > > wrote:
> >
> > > Hi All,
> > > We have a use case to cancel the already running DAG. So is there any
> > > recommended way to do so.
> > >
> > > Thanks,
> > > Raman
> > >
> >
> 


Re: Cancel a Running dag

2018-04-13 Thread ramandumcs
Thanks Bolke,
Will it become part of Airflow 1.10 release. Is there any tentative timeline 
for same.
-Raman Gupta

On 2018/04/12 19:19:07, Bolke de Bruin  wrote: 
> This is now fixed in master. Clearing tasks will now properly terminate a 
> running task. If you pause the dag run no new tasks will be scheduled.
> 
> B.
> 
> 
> 
> Verstuurd vanaf mijn iPad
> 
> > Op 12 apr. 2018 om 20:23 heeft Laura Lorenz  het 
> > volgende geschreven:
> > 
> > That won't stop them if they are already running in a celery worker or
> > already in your messaging queue backend (e.g. rabbitmq; redis), but it will
> > prevent the message to do them from being emitted again by the airflow
> > scheduler to your messaging queue backend. To be thorough you have to do
> > both - stop the scheduler from scheduling the tasks anymore (by failing
> > them individually and/or the DagRun in the metadata database) and, if you
> > want to make sure the tasks that already got picked up stop and don't try
> > again, you have to kill their worker processes and make sure your messaging
> > queue is clean of messages of that task type. If you don't care that any
> > already started or queued up tasks finish, you can simply doctor the
> > metadata database.
> > 
> > Laura
> > 
> > On Thu, Apr 12, 2018 at 12:40 PM, ramandu...@gmail.com  >> wrote:
> > 
> >> Thanks Laura,
> >> We are using the CeleryExecutor. Just wondering if marking the
> >> TaskInstances as failed in metadata store would also work.
> >> -Raman
> >> 
> >>> On 2018/04/12 16:27:00, Laura Lorenz  wrote:
> >>> I use the CeleryExecutor and have used a mix of `celery control` and
> >>> messaging queue purges to kill the running tasks and prevent them from
> >>> being picked up by workers again (respectively), and doctor the DagRun to
> >>> failed to stop the scheduler from repopulating the message. I think if
> >> you
> >>> are using the Local or Sequential Executor you'd have to kill the
> >> scheduler
> >>> process.
> >>> 
> >>> Laura
> >>> 
> >>> On Thu, Apr 12, 2018 at 12:05 PM, Taylor Edmiston 
> >>> wrote:
> >>> 
>  I don't think killing a currently running task is possible today.
>  
>  Of course you can pause it from the CLI or web UI so that future runs
> >> don't
>  get triggered, but it sounds like that's not what you're looking for.
>  
>  Best,
>  Taylor
>  
>  *Taylor Edmiston*
>  Blog  | Stack Overflow CV
>   | LinkedIn
>   | AngelList
>  
>  
>  
>  On Thu, Apr 12, 2018 at 11:26 AM, ramandu...@gmail.com <
>  ramandu...@gmail.com
> > wrote:
>  
> > Hi All,
> > We have a use case to cancel the already running DAG. So is there any
> > recommended way to do so.
> > 
> > Thanks,
> > Raman
> > 
>  
> >>> 
> >> 
> 


Re: Benchmarking of Airflow Scheduler with Celery Executor

2018-04-13 Thread ramandumcs
Thanks Ry,
Just wondering if there is any approximate number on concurrent tasks a 
scheduler can run on say 16 GB RAM and 8 core machine.
If its already been done that would be useful.
We did some benchmarking with local executor and observed that each 
TaskInstance was taking ~100MB of memory so we could only run ~130 concurrent 
tasks on 16 GB RAM and 8 core machine.

-Raman Gupta  

 

On 2018/04/12 16:32:37, Ry Walker  wrote: 
> Hi Raman -
> 
> First, we’d be happy to help you test this out with Airflow. Or you could
> do it yourself by using http://open.astronomer.io/airflow/ (w/ Docker
> Engine + Docker Compose) to quickly spin up a test environment. Everything
> is hooked to Prometheus/Grafana to monitor how the system reacts to your
> workload.
> 
> -Ry
> CEO, Astronomer
> 
> On April 12, 2018 at 12:23:46 PM, ramandu...@gmail.com (ramandu...@gmail.com)
> wrote:
> 
> Hi All,
> We have requirement to run 10k(s) of concurrent tasks. We are exploring
> Airflow's Celery Executor for same. Horizontally Scaling of worker nodes
> seem possible but it can only have one active scheduler.
> So will Airflow scheduler be able to handle these many concurrent tasks.
> Is there any benchmarking number around airflow scheduler's scalability.
> Thanks,
> Raman
> 


Re: Cancel a Running dag

2018-04-18 Thread ramandumcs
We are exploring following approach for DAG cancellation. Please let us know if 
you see any issue with this
1) Set/create the xcom variable "cancel":"true". It would be set out of the 
band by updating the xcom Table in metadata store. 
2) Operators would have the code to periodically check  for the "cancel" flag. 
They will read it from xcom.
3) If the flag is set  the operators would do the necessary clean up and would 
exit.
4) Through DAG's dependency condition we can ensure that Downstream tasks are 
not triggered and are skipped.

Thanks,
Raman GUpta

On 2018/04/13 08:33:50, "Driesprong, Fokko"  wrote: 
> Like Bolke said, it has been fixed in master. One of the perquisites is
> support by the operator. For example, the Spark operator has implemented
> how to kill the Spark job on YARN, Local and Kubernetes. If you are running
> something else, you might want to check if this is implemented.
> 
> Implemented on_kill: https://github.com/apache/incubator-airflow/pull/3204
> An example of the on_kill:
> https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/hooks/spark_submit_hook.py#L485-L534
> 
> Cheers, Fokko
> 
> 2018-04-12 21:19 GMT+02:00 Bolke de Bruin :
> 
> > This is now fixed in master. Clearing tasks will now properly terminate a
> > running task. If you pause the dag run no new tasks will be scheduled.
> >
> > B.
> >
> >
> >
> > Verstuurd vanaf mijn iPad
> >
> > > Op 12 apr. 2018 om 20:23 heeft Laura Lorenz 
> > het volgende geschreven:
> > >
> > > That won't stop them if they are already running in a celery worker or
> > > already in your messaging queue backend (e.g. rabbitmq; redis), but it
> > will
> > > prevent the message to do them from being emitted again by the airflow
> > > scheduler to your messaging queue backend. To be thorough you have to do
> > > both - stop the scheduler from scheduling the tasks anymore (by failing
> > > them individually and/or the DagRun in the metadata database) and, if you
> > > want to make sure the tasks that already got picked up stop and don't try
> > > again, you have to kill their worker processes and make sure your
> > messaging
> > > queue is clean of messages of that task type. If you don't care that any
> > > already started or queued up tasks finish, you can simply doctor the
> > > metadata database.
> > >
> > > Laura
> > >
> > > On Thu, Apr 12, 2018 at 12:40 PM, ramandu...@gmail.com <
> > ramandu...@gmail.com
> > >> wrote:
> > >
> > >> Thanks Laura,
> > >> We are using the CeleryExecutor. Just wondering if marking the
> > >> TaskInstances as failed in metadata store would also work.
> > >> -Raman
> > >>
> > >>> On 2018/04/12 16:27:00, Laura Lorenz  wrote:
> > >>> I use the CeleryExecutor and have used a mix of `celery control` and
> > >>> messaging queue purges to kill the running tasks and prevent them from
> > >>> being picked up by workers again (respectively), and doctor the DagRun
> > to
> > >>> failed to stop the scheduler from repopulating the message. I think if
> > >> you
> > >>> are using the Local or Sequential Executor you'd have to kill the
> > >> scheduler
> > >>> process.
> > >>>
> > >>> Laura
> > >>>
> > >>> On Thu, Apr 12, 2018 at 12:05 PM, Taylor Edmiston  > >
> > >>> wrote:
> > >>>
> >  I don't think killing a currently running task is possible today.
> > 
> >  Of course you can pause it from the CLI or web UI so that future runs
> > >> don't
> >  get triggered, but it sounds like that's not what you're looking for.
> > 
> >  Best,
> >  Taylor
> > 
> >  *Taylor Edmiston*
> >  Blog  | Stack Overflow CV
> >   | LinkedIn
> >   | AngelList
> >  
> > 
> > 
> >  On Thu, Apr 12, 2018 at 11:26 AM, ramandu...@gmail.com <
> >  ramandu...@gmail.com
> > > wrote:
> > 
> > > Hi All,
> > > We have a use case to cancel the already running DAG. So is there any
> > > recommended way to do so.
> > >
> > > Thanks,
> > > Raman
> > >
> > 
> > >>>
> > >>
> >
> 


Task get stuck in up_for_retry state

2018-04-23 Thread ramandumcs
Hi ,
I am providing the "retries" and "retry_delay" as an argument to one of my 
operator in the DAG. But the corresponding task get stuck in "up_for_retry" 
state and is not being retried by scheduler. retry_delay is set to 
timedelta(seconds=5) so it should get retried after 5 seconds.
Thanks,
Raman Gupta


Configuration passing through airflow cli (trigger_dag)

2018-05-02 Thread ramandumcs
Hi,
I need to pass certain arguments to my custom operator at run time. It seems 
that airflow cli's trigger_dag
command support passing conf at run time which referred in operator's execute 
function through {{ dag_run.conf['name'] }} template.
But I am not being able to read the "name" conf inside operator.
Am I missing something here. 
Any pointer on this would help me in moving forward.

Thanks,
Raman Gupta


Template for Operator' Json Argument

2018-05-05 Thread ramandumcs
Hi All,

We have implemented a custom operator which is derived from baseOperator. 
Custom operator takes a JSON argument. Some fields of this Json are string and 
others are integer. We need to templatised the string fields only and not 
integer. But on doing this we are getting the error 
airflow.exceptions.AirflowException: Type '' used for parameter 
'json[number]' is not supported for templating.

So is this possible to have both string and integer in templatised field.

Thanks,
Raman Gupta


Airflow Scheduler Crashing

2018-05-09 Thread ramandumcs
Hi All,

We are running airflow version 1.9 in LocalExecutor mode. We are observing that 
scheduler is crashed after few hours with below stack logs(Seems to be an issue 
with Mysql Connection. Is there any fix or workaround for this)
Traceback (most recent call last):
  File "/usr/src/venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 
371, in helper
pickle_dags)
  File "/usr/src/venv/local/lib/python2.7/site-packages/airflow/utils/db.py", 
line 50, in wrapper
result = func(*args, **kwargs)
  File "/usr/src/venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 
1762, in process_file
dag.sync_to_db()
  File "/usr/src/venv/local/lib/python2.7/site-packages/airflow/utils/db.py", 
line 50, in wrapper
result = func(*args, **kwargs)
  File "/usr/src/venv/local/lib/python2.7/site-packages/airflow/models.py", 
line 3806, in sync_to_db
DagModel).filter(DagModel.dag_id == self.dag_id).first()
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 
2825, in first
ret = list(self[0:1])
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 
2617, in __getitem__
return list(res)
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 
2925, in __iter__
return self._execute_and_instances(context)
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 
2946, in _execute_and_instances
close_with_result=True)
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 
2955, in _get_bind_args
**kw
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/query.py", line 
2937, in _connection_from_session
conn = self.session.connection(**kw)
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", 
line 1035, in connection
execution_options=execution_options)
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", 
line 1040, in _connection_for_bind
engine, execution_options)
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", 
line 409, in _connection_for_bind
conn = bind.contextual_connect()
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
line 2123, in contextual_connect
self._wrap_pool_connect(self.pool.connect, None),
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", 
line 2158, in _wrap_pool_connect
return fn()
  File "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
line 403, in connect
return _ConnectionFairy._checkout(self)
  File "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
line 788, in _checkout
fairy = _ConnectionRecord.checkout(pool)
  File "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
line 532, in checkout
rec = pool._do_get()
  File "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
line 1193, in _do_get
self._dec_overflow()
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py",
 line 66, in __exit__
compat.reraise(exc_type, exc_value, exc_tb)
  File "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
line 1190, in _do_get
return self._create_connection()
  File "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
line 350, in _create_connection
return _ConnectionRecord(self)
  File "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
line 477, in __init__
self.__connect(first_connect_check=True)
  File "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/pool.py", 
line 671, in __connect
connection = pool._invoke_creator(self)
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/engine/strategies.py",
 line 106, in connect
return dialect.connect(*cargs, **cparams)
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/engine/default.py", 
line 410, in connect
return self.dbapi.connect(*cargs, **cparams)
  File "/usr/src/venv/local/lib/python2.7/site-packages/MySQLdb/__init__.py", 
line 86, in Connect
return Connection(*args, **kwargs)
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/MySQLdb/connections.py", line 
209, in __init__
self._server_version = tuple([ numeric_part(n) for n in 
self.get_server_info().split('.')[:2] ])
  File 
"/usr/src/venv/local/lib/python2.7/site-packages/MySQLdb/connections.py", line 
69, in numeric_part
m = re_numeric_part.match(s)
  File "/usr/src/venv/local/lib/python2.7/site-packages/airflow/bin/cli.py", 
line 73, in sigint_handler
sys.exit(0)
SystemExit: 0


Tasks are marked as zombie

2018-05-11 Thread ramandumcs
Hi All,
We are facing a issue in which Tasks are marked as Zombie. We are running 
airflow in LocalExecutor Mode and there is not much load on the machine or on 
Mysql server. 
There is one config which seem to change this behaviour

# Local task jobs periodically heartbeat to the DB. If the job has
# not heartbeat in this many seconds, the scheduler will mark the
# associated task instance as failed and will re-schedule the task.
scheduler_zombie_task_threshold = 300

Are there any other config(s) which can be used to change this behavior

Thanks,
Raman Gupta


Dags getting failed after 24 hours

2018-05-21 Thread ramandumcs
Hi All,
We have a long running DAG which is expected to take around 48 hours. But we 
are observing that its get killed by Airflow scheduler after ~24 hrs. We are 
not setting any Dag/task execution timeout explicitly.
Is there any default timeout value that get used. We are using LocalExecutor 
mode.
We checked in the Airflow code but execution timeout values seem to be set to 
'None'

Thanks,
Raman Gupta


Scheduler gets slow with increasing number of Dags

2018-05-23 Thread ramandumcs
Hi All,

We have a use case where there are 100s of DAGs in the scheduler's local dag 
folder but at a time only ~50 dags are active(Other dags are disabled). New 
dags keep on adding to the local Dag folder.
We are observing that scheduler is taking lot of time(around 20 minutes) in 
picking those newly created dags.
We are using following conf
Machine config: 8 core, 16 GB RAM
Executor: LocalExecutor
Parallelism: 50
max_thread: 4,
Dir list interval: 60.

Is there any recommended setting or config to optimise the scheduler's dag 
listing and parsing.

Thanks,
Raman


Disable Processing of DAG file

2018-05-28 Thread ramandumcs
Hi All,

We have a use case where there would be 100(s) of DAG files with schedule set 
to "@once". Currently it seems that scheduler processes each and every file and 
creates a Dag Object.
Is there a way or config to tell scheduler to stop processing certain files.

Thanks,
Raman Gupta


Re: Disable Processing of DAG file

2018-05-30 Thread ramandumcs
Thanks Maxime,
we have 100(s) of dags with schedule set to @once with new DAGs keep on coming 
in the system. 
Scheduler process each and every DAG inside the local DAG folder. Each Dag file 
processing takes around 400 millisecond and we have set max_threads to 8(As we 
have 8 core machine). i.e 8 DAG files would be processed in parallele
In the processing loop Scheduler also sleeps for 1 sec 
(_processor_poll_interval is set to 1) so effectively it  processes 8 files 
every 1 seconds. 
so Every DAG file gets processed after ~(Number of Dags/8) seconds. And 
Scheduler latency of processing  new DAG file increases with increase in the 
number of DAGs.

-Raman Gupta

On 2018/05/30 06:08:55, Maxime Beauchemin  wrote: 
> The TLDR of how the processor works is:
> 
> while True:
> * sets a multiprocessing queue with N processes (say 32)
> * main process looks for the list of all .py files in DAGS_FOLDER
> * fills in the queue with all .py
> * each one of the 32 suprocess opens a file and interprets it (it's
> insulated from the main process, a sys.exit() wouldn't affect the main
> process), looks for DAG object in module namespace
> * if it finds a DAG object, it looks for active DAG runs, and creates new
> DAG runs if a new schedule is ready to start
> * for each active DAG Run, it looks at all "runable" tasks and looks to see
> if dependencies have been met
> * returns a list of all tasks ready to get triggered to main process
> * main process wait for a certain specified amount of time, accumulates
> task instance list that are all ready to run
> * the scheduling train leaves the station, prioritize tasks based
> priority_weight and schedules where pool slots are availlable
> * other supervisor-type tasks, like handling zombie tasks and such
> 
> A long long time ago we didn't have subprocesses and things like a DAG with
> a `sys.exit()` would crash the scheduler, and modules imported in DAGs
> files would get cached in `sys.modules` unless you'd force
> `reload(my_submodule)`. There was (and still is) a flag on the scheduler
> CLI command to force it to exit after a certain number of runs so that your
> service would restart it in a loop and flush sys.modules .  But those days
> are long gone, and there's no reason to do this anymore.
> 
> Max
> 
> 
> On Mon, May 28, 2018 at 11:29 PM Ruiqin Yang  wrote:
> 
> > Hi folks,
> > This config line
> > <
> > https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/default_airflow.cfg#L414
> > >
> > controls how often the scheduler scan the DAG folder and tries to discover/
> > forget DAGs.
> >
> > For doing dag file processing part, scheduler does parse the DAG file
> > everytime before it schedules tasks through DagFileProcessor.
> >
> > Cheers,
> > Kevin Y
> >
> > On Mon, May 28, 2018 at 10:14 PM, Ananth Durai 
> > wrote:
> >
> > > It is an interesting question. On a slightly related note, Correct me if
> > > I'm wrong, AFAIK we require restarting airflow scheduler in order pick
> > any
> > > new DAG file changes by the scheduler. In that case, should the scheduler
> > > do the DAGFileProcessing every time before scheduling the tasks?
> > >
> > > Regards,
> > > Ananth.P,
> > >
> > >
> > >
> > >
> > >
> > >
> > > On 28 May 2018 at 21:46, ramandu...@gmail.com 
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > We have a use case where there would be 100(s) of DAG files with
> > schedule
> > > > set to "@once". Currently it seems that scheduler processes each and
> > > every
> > > > file and creates a Dag Object.
> > > > Is there a way or config to tell scheduler to stop processing certain
> > > > files.
> > > >
> > > > Thanks,
> > > > Raman Gupta
> > > >
> > >
> >
> 




Re: Single Airflow Instance Vs Multiple Airflow Instance

2018-06-07 Thread ramandumcs
We have similar use case where we need to support multiple teams and expected 
load is 1000(s) active TIs. We are exploring setting up multiple airflow 
cluster on for each team and scale that cluster horizontally through   celery 
executor.
@Ruiquin could you please share some details on airflow setup like
Airflow Version, Machine configuration, Airflow cfg settings etc..
How can we configure infinity(0) for cluster-wide setting. (We are using 
airflow v1.9 and it seems that
airflow cfg's parallelism = 0 is not supported in v1.9)

On 2018/06/07 22:27:20, Ruiqin Yang  wrote: 
> Here to provide a datapoint from Airbnb--all users share the same cluster
> (~8k active DAGs and ~15k running tasks at peak).
> 
> For the cluster-wide concurrency setting, we put infinity( 0) there and
> scale up on the # of workers if we need more worker slot.
> 
> For the scheduler & Airflow UI coupling, I believe Airflow UI is not
> coupled with the scheduler. Actually in Airbnb we couple airflow worker and
> airflow webserver together on the same EC2 instance--but you can always
> have a set of instances only hosting webservers.
> 
> If you have some critical users that don't want their DAG affected by
> changes from other users( adhoc new DAGs/tasks), you can probably set up
> dedicated celery queue( assuming you are using celery executor, local
> executor is in theory not for production) for the user, or, you can enforce
> DAG level concurrency( maybe a CI or through policy
> --which
> I'm not sure is a good practice since it is more for task level attributes).
> 
> With the awesome RBAC change in place, I think it make sense to share the
> same cluster, easier maintenance, less user confusion, etc.
> 
> Cheers,
> Kevin Y
> 
> On Thu, Jun 7, 2018 at 1:59 PM Ananth Durai  wrote:
> 
> > At Slack, We follow a similar pattern of deploying multiple airflow
> > instances. Since the Airflow UI & the scheduler coupled, it introduces
> > friction as the user need to know underlying deployment strategy. (like
> > which Airflow URL I should visit to see my DAGs, multiple teams
> > collaborating on the same DAG, pipeline operations, etc.)
> >
> > In one of the forum question, max mentioned renaming the scheduler to
> > supervisor as the scheduler do more than just scheduling.
> > It would be super cool if we can make multiple supervisors share the same
> > airflow metadata storage and the Airflow UI. (maybe introducing a unique
> > config param `supervisor.id` for each instance)
> >
> > The approach will help us to scale Airflow scheduler horizontally and while
> > keeping the simplicity from the user perspective.
> >
> >
> > Regards,
> > Ananth.P,
> >
> >
> >
> >
> >
> >
> > On 7 June 2018 at 04:08, Arturo Michel 
> > wrote:
> >
> > > We have had up to 50 dags with multiple tasks each. Many of them run in
> > > parallel, we've had some issues with compute as it was meant to be a
> > > temporary deployment but somehow it's now the permanent production one
> > and
> > > resources are not great.
> > > Oranisationally it is very similar to what Gerard described. More than
> > one
> > > group working with different engineering practices and standards, this is
> > > probably one of the sources of problems.
> > >
> > > -Original Message-
> > > From: Gerard Toonstra 
> > > Sent: Wednesday, June 6, 2018 5:02 PM
> > > To: dev@airflow.incubator.apache.org
> > > Subject: Re: Single Airflow Instance Vs Multiple Airflow Instance
> > >
> > > We are using two cluster instances. One cluster is for the engineering
> > > teams that are in the "tech" wing and which rigorously follow tech
> > > principles, the other instance is for use by business analysts and more
> > > ad-hoc, experimental work, who do not necessarily follow the principles.
> > We
> > > have a nomad engineer helping out the ad-hoc cluster, setting it up,
> > > connecting it to all systems and resolving programming questions. All
> > > clusters are fully puppetized, so we reuse configs and ways how things
> > are
> > > configured, plus have a common "platform code" package that is reused
> > > across both clusters.
> > >
> > > G>
> > >
> > >
> > > On Wed, Jun 6, 2018 at 5:50 PM, James Meickle 
> > > wrote:
> > >
> > > > An important consideration here is that there are several settings
> > > > that are cluster-wide. In particular, cluster-wide concurrency
> > > > settings could result in Team B's DAG refusing to schedule based on an
> > > error in Team A's DAG.
> > > >
> > > > Do your teams follow similar practices in how eagerly they ship code,
> > > > or have similar SLAs for resolving issues? If so, you are probably
> > > > fine using co-tenancy. If not, you should probably talk about it first
> > > > to make sure the teams are okay with co-tenancy.
> > > >
> > > > On Wed, Jun 6, 2018 at 11:24 AM, gauthiermarti...@gmail.com <
> > > > gauthiermarti...@gmail.com> wrote:
> > > >
> > > > > Hi Everyone,

Re: Single Airflow Instance Vs Multiple Airflow Instance

2018-06-08 Thread ramandumcs
Thanks Kevin,
I am specifically interested in scheduler settings
like scheduler_zombie_task_threshold, max_tis_per_query
We are expecting the load in terms of 1000(s) concurrent Dags so any airflow 
setting which might help us in achieving this target would be useful.
There will be 1000(s)  local DAG file increase with schedule set to @once. 



On 2018/06/08 05:13:39, Ruiqin Yang  wrote: 
> Not sure about 1.9 but parallelism seems to be supported on master
> .
> We are using 1.8 with some bug fixing cherry-picks. The machine is just out
> of the box AWS EC2 instances. We've been using I3 for scheduler and R3 for
> worker, but I urge you to checkout the new generations which are more
> powerful and cheaper. As always, you may pick the best series by profile
> your machine usage( on I/O, ram, cpu, etc). I don't think we've tuned too
> much on the default Airflow settings and the best setting for you guys
> should be different that the one best for us( that being said, I can
> provide some more details when I'm back to the office if you are curious on
> some particular settings).
> 
> Cheers,
> Kevin Y
> 
> On Thu, Jun 7, 2018 at 9:02 PM ramandu...@gmail.com 
> wrote:
> 
> > We have similar use case where we need to support multiple teams and
> > expected load is 1000(s) active TIs. We are exploring setting up multiple
> > airflow cluster on for each team and scale that cluster horizontally
> > through   celery executor.
> > @Ruiquin could you please share some details on airflow setup like
> > Airflow Version, Machine configuration, Airflow cfg settings etc..
> > How can we configure infinity(0) for cluster-wide setting. (We are using
> > airflow v1.9 and it seems that
> > airflow cfg's parallelism = 0 is not supported in v1.9)
> >
> > On 2018/06/07 22:27:20, Ruiqin Yang  wrote:
> > > Here to provide a datapoint from Airbnb--all users share the same cluster
> > > (~8k active DAGs and ~15k running tasks at peak).
> > >
> > > For the cluster-wide concurrency setting, we put infinity( 0) there and
> > > scale up on the # of workers if we need more worker slot.
> > >
> > > For the scheduler & Airflow UI coupling, I believe Airflow UI is not
> > > coupled with the scheduler. Actually in Airbnb we couple airflow worker
> > and
> > > airflow webserver together on the same EC2 instance--but you can always
> > > have a set of instances only hosting webservers.
> > >
> > > If you have some critical users that don't want their DAG affected by
> > > changes from other users( adhoc new DAGs/tasks), you can probably set up
> > > dedicated celery queue( assuming you are using celery executor, local
> > > executor is in theory not for production) for the user, or, you can
> > enforce
> > > DAG level concurrency( maybe a CI or through policy
> > > <
> > https://github.com/apache/incubator-airflow/blob/master/airflow/settings.py#L109
> > >--which
> > > I'm not sure is a good practice since it is more for task level
> > attributes).
> > >
> > > With the awesome RBAC change in place, I think it make sense to share the
> > > same cluster, easier maintenance, less user confusion, etc.
> > >
> > > Cheers,
> > > Kevin Y
> > >
> > > On Thu, Jun 7, 2018 at 1:59 PM Ananth Durai  wrote:
> > >
> > > > At Slack, We follow a similar pattern of deploying multiple airflow
> > > > instances. Since the Airflow UI & the scheduler coupled, it introduces
> > > > friction as the user need to know underlying deployment strategy. (like
> > > > which Airflow URL I should visit to see my DAGs, multiple teams
> > > > collaborating on the same DAG, pipeline operations, etc.)
> > > >
> > > > In one of the forum question, max mentioned renaming the scheduler to
> > > > supervisor as the scheduler do more than just scheduling.
> > > > It would be super cool if we can make multiple supervisors share the
> > same
> > > > airflow metadata storage and the Airflow UI. (maybe introducing a
> > unique
> > > > config param `supervisor.id` for each instance)
> > > >
> > > > The approach will help us to scale Airflow scheduler horizontally and
> > while
> > > > keeping the simplicity from the user perspective.
> > > >
> > > >
> > > > Regards,
> > > > Ananth.P,
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > On 7 June 2018 at 04:08, Arturo Michel 
> > > > wrote:
> > > >
> > > > > We have had up to 50 dags with multiple tasks each. Many of them run
> > in
> > > > > parallel, we've had some issues with compute as it was meant to be a
> > > > > temporary deployment but somehow it's now the permanent production
> > one
> > > > and
> > > > > resources are not great.
> > > > > Oranisationally it is very similar to what Gerard described. More
> > than
> > > > one
> > > > > group working with different engineering practices and standards,
> > this is
> > > > > probably one of the sources of problems.
> > > > >
> > > > > -Original Me

Concurrency Settings for Celery Executor

2018-06-13 Thread ramandumcs
Hi All,

There seems to be couple of settings in airflow.cfg which controls the number 
of tasks that can run in parallel on Airflow( "parallelism" and 
"celeryd_concurrency")
In case of celeryExecutor which one is honoured. Do we need to set both or 
setting only  celeryd_concurrency would work.

Thanks,
Raman



Re: Concurrency Settings for Celery Executor

2018-06-21 Thread ramandumcs
Thanks George.
On 2018/06/20 20:06:38, George Leslie-Waksman  
wrote: 
> "celeryd_concurrency" and "parallelism" serve different purposes
> 
> "celeryd_concurrency" determines how many worker subprocesses will be spun
> up on an airflow worker instance (how many concurrent tasks per machine)
> 
> "parallelism" determines how many tasks airflow will schedule at once;
> running and queued tasks count against this limit
> 
> Generally, you will want to set "celeryd_concurrency" to as high a value as
> your worker nodes can handle (or a lower value and use low powered workers)
> 
> As a general rule of thumb, you will want to set "parallelism" to be
> (number of workers) * ("celeryd_concurrency"). A lower "parallelism" value
> will leave workers idle. If you increase "parallelism", you will get tasks
> backed up in your celery queue; this is great for high throughput and makes
> sure workers aren't starved for work but can slightly increase the latency
> of individual tasks.
> 
> Once you have those values tuned, you also need to be aware of dag
> concurrency limits and task pool limits. Each DAG has a concurrency
> parameter, which defaults to the "dag_concurrency" configuration value.
> Each Task has a pool parameter, which defaults to `None`. These each
> decrease the number of running tasks below the parallelism limit.
> 
> DAG concurrency behaves exactly the same as parallelism but sets a limit on
> the individual DAG. Depending on how many DAGs you have, when they're
> scheduled, etc., you will want to set the "dag_concurrency" to a level that
> doesn't prevent DAGs from stepping on each others' resources too much. You
> can also set individual limits for individual DAGs.
> 
> Pools also behave the same as parallelism but operate across DAGs. They are
> useful for things like limiting the number of concurrent tasks hitting a
> single database. If you do not specify a pool, the tasks will be placed in
> the default pool, which has a size defined by
> the "non_pooled_task_slot_count" configuration variable.
> 
> If you don't want to think too much about things, try:
> 
> celeryd_concurrency = 4 or 8
> parallelism = celeryd_concurrency * number of workers
> num_pooled_task_slot_count = parallelism
> dag_concurrency = parallelism / 2
> 
> and then pretend pools and individual dag concurrency limits don't exist
> 
> On Wed, Jun 13, 2018 at 4:11 AM ramandu...@gmail.com 
> wrote:
> 
> > Hi All,
> >
> > There seems to be couple of settings in airflow.cfg which controls the
> > number of tasks that can run in parallel on Airflow( "parallelism" and
> > "celeryd_concurrency")
> > In case of celeryExecutor which one is honoured. Do we need to set both or
> > setting only  celeryd_concurrency would work.
> >
> > Thanks,
> > Raman
> >
> >
> 


Scheduler crashed due to mysql connectivity errors

2018-06-28 Thread ramandumcs
Hi All,
We are using airflow 1.9 and are observing scheduler crashes due to mysql 
connectivity related issues.
like 
"scheduler is crashed because of OperationalError: 
(_mysql_exceptions.OperationalError) (2013, 'Lost connection to MySQL server 
during query') (Background on this error at: http://sqlalche.me/e/e3q8)"
Is there a way or config settings to make scheduler more resilient to Mysql 
errors.

Thanks,
Raman Gupta


Scheduler not honouring non_pooled_task_slot_count config

2018-07-09 Thread ramandumcs
We are using airflow version 1.9 with celery executor. And we are observing 
that Airflow Scheduler is not honouring the "non_pooled_task_slot_count" 
config.  We are using default setting which is set to 128. But we could 
schedule and run >128 tasks concurrently.
>From code it seems that scheduler is re-initialising the open_slots with 128 
>instead of setting the remaining left over slots.
In jobs.py
for pool, task_instances in pool_to_task_instances.items():
if not pool:
# Arbitrary:
# If queued outside of a pool, trigger no more than
# non_pooled_task_slot_count per run
open_slots = conf.getint('core', 'non_pooled_task_slot_count')
Thanks,
Raman Gupta


Airflow support for kubernetes Exceutor

2018-07-18 Thread ramandumcs
will 1.10 release have support for kubernetes Executor.

Thanks,
Raman


Task is Stuck in Up_For_Retry

2018-08-16 Thread ramandumcs
Hi All,

We are using airflow 1.9 with Local Executor more. Intermittently we are 
observing that tasks are getting stuck in "up_for_retry" mode and are getting 
retried again and again exceeding their configured max retries count. like we 
have configured max retries as 2 but task is retried 15 times and got stuck in 
up_for_retry state.
Any pointer on this would be helpful.

Thanks,
Raman Gupta


Re: Task is Stuck in Up_For_Retry

2018-08-17 Thread ramandumcs
Thanks Taylor,
We are getting this issue even after restart. We are observing that task 
instance state is transitioned from
scheduled->queued->up_for_retry and dag gets stuck in up_for_retry state. 
Behind the scenes executor keep on retrying the dag's task exceeding the max 
retry limit. 
In normal scenario task state should have following transition
scheduled->queued->running->up_for_retry
but we are seeing task is not entered in to running state rather it moves 
directly to up_for_retry from queued state.
Any pointer on this would be helpful.

Thanks,
raman Gupta


On 2018/08/16 16:05:31, Taylor Edmiston  wrote: 
> Does a scheduler restart make a difference?
> 
> *Taylor Edmiston*
> Blog  | CV
>  | LinkedIn
>  | AngelList
>  | Stack Overflow
> 
> 
> 
> On Thu, Aug 16, 2018 at 4:25 AM, ramandu...@gmail.com 
> wrote:
> 
> > Hi All,
> >
> > We are using airflow 1.9 with Local Executor more. Intermittently we are
> > observing that tasks are getting stuck in "up_for_retry" mode and are
> > getting retried again and again exceeding their configured max retries
> > count. like we have configured max retries as 2 but task is retried 15
> > times and got stuck in up_for_retry state.
> > Any pointer on this would be helpful.
> >
> > Thanks,
> > Raman Gupta
> >
> 


Re: Task is Stuck in Up_For_Retry

2018-08-17 Thread ramandumcs
We are getting the logs like

{local_executor.py:43} INFO - LocalWorker running airflow run 
 {models.py:1595} ERROR - Executor reports task instance %s finished (%s) 
although the task says its %s. Was the task killed externally?
{models.py:1616} INFO - Marking task as UP_FOR_RETRY

It seems that in  LocalExecutor.py file "subprocess_checkcall" command is 
returning successfully without throwing an error but Task state did not get 
updated from queued to running in mysql store.
One possibility is that "_check_and_change_state_before_execution" function in 
models.py is returning false due to which task state is not getting updated to 
"running" state and task get stuck in up_for_retry in handle_failure function.

Other possibility could be unavailability of system resources to create a new 
process inside "subprocess_checkcall" function call but I think in that case it 
would be throwing an error.  

Is there a way to enable the "verbose" logging flag in 
_check_and_change_state_before_execution call. It would help us in debugging it 
further.

Thanks,
Raman Gupta


On 2018/08/17 15:22:27, Matthias Huschle  wrote: 
> Hi Raman,
> 
> Does it happen only occasionally, or can it be easily reproduced?
> What happens if you start it with "airflow run" or " airflow test"?
> What is in the logs about it?
> What is your user process limit ("ulimit -u") on that machine?
> 
> 
> 2018-08-17 15:39 GMT+02:00 ramandu...@gmail.com :
> 
> > Thanks Taylor,
> > We are getting this issue even after restart. We are observing that task
> > instance state is transitioned from
> > scheduled->queued->up_for_retry and dag gets stuck in up_for_retry state.
> > Behind the scenes executor keep on retrying the dag's task exceeding the
> > max retry limit.
> > In normal scenario task state should have following transition
> > scheduled->queued->running->up_for_retry
> > but we are seeing task is not entered in to running state rather it moves
> > directly to up_for_retry from queued state.
> > Any pointer on this would be helpful.
> >
> > Thanks,
> > raman Gupta
> >
> >
> > On 2018/08/16 16:05:31, Taylor Edmiston  wrote:
> > > Does a scheduler restart make a difference?
> > >
> > > *Taylor Edmiston*
> > > Blog  | CV
> > >  | LinkedIn
> > >  | AngelList
> > >  | Stack Overflow
> > > 
> > >
> > >
> > > On Thu, Aug 16, 2018 at 4:25 AM, ramandu...@gmail.com <
> > ramandu...@gmail.com>
> > > wrote:
> > >
> > > > Hi All,
> > > >
> > > > We are using airflow 1.9 with Local Executor more. Intermittently we
> > are
> > > > observing that tasks are getting stuck in "up_for_retry" mode and are
> > > > getting retried again and again exceeding their configured max retries
> > > > count. like we have configured max retries as 2 but task is retried 15
> > > > times and got stuck in up_for_retry state.
> > > > Any pointer on this would be helpful.
> > > >
> > > > Thanks,
> > > > Raman Gupta
> > > >
> > >
> >
> 
> 
> 
> -- 
> 
> *< Dr. Matthias Huschle />*
> 
> Data and Analytics Manager
> 
> 
> 
> 
> 
> E: matthias.husc...@paymill.de
> 
> Connect with me LinkedIn
> | Xing
> 
> 
> 
> Credit & debit cards or PayPal? Now you can have both through the PAYMILL
> API. Find out more » 
> 
> 
> PAYMILL
> GmbH | St.-Martin-Straße 63, 81669 München 
> 
> 
> Follow us @PAYMILL  | www.PAYMILL.com
> 
> 
> 
> The information in this e-mail is confidential and may be protected by
> professional secrecy. It is intended solely for the addressee. Any access
> to this e-mail is prohibited by persons other than the addressee. If you
> are not the named addressee, you are prohibited to make any attempt to
> publicise, reproduce or distribute this e-mail, this includes the taking or
> refraining of action in regards to the information obtained. Please notify
> the sender by e-mail immediately if you have received this e-mail by
> mistake and delete this e-mail from your system.
> PAYMILL GmbH | Sitz München | Amtsgericht München | HRB 226526 |
> Steuernummer: 143/169/70894 | USt-ID: DE308345749 | Geschäftsführer: Daniel
> Georges.
> 


Airflow Meetup in India

2018-08-20 Thread ramandumcs
Hi All,
Do we have airflow meetup(s) in India. We are based out of india and are using 
Apache Airflow as an orchestration engine to author and manage 1000(s) of 
multi-step workflows. 
Would be interested in joining/conducting Airflow Meetup in India.

Thanks,
Raman Gupta   



Re: Airflow Meetup in India

2018-08-21 Thread ramandumcs
Thanks Sumit,
We work for Adobe and will try to have it in bangalore based on the 
participation.
Would it be possible for you to  share the poll link.

Thanks,
Raman Gupta
   
On 2018/08/21 06:38:37, Sumit Maheshwari  wrote: 
> Hi Raman,
> 
> Folks from Qubole certainly join/talk if the meetup is happening in
> Bangalore. However, sometime back I did a poll about the same and got only
> 7 responses, so have dropped the idea of conducting.
> 
> Let us know if you get enough traction this time.
> 
> Thanks,
> Sumit
> 
> On Tue, Aug 21, 2018 at 10:28 AM ramandu...@gmail.com 
> wrote:
> 
> > Hi All,
> > Do we have airflow meetup(s) in India. We are based out of india and are
> > using Apache Airflow as an orchestration engine to author and manage
> > 1000(s) of multi-step workflows.
> > Would be interested in joining/conducting Airflow Meetup in India.
> >
> > Thanks,
> > Raman Gupta
> >
> >
> 


Re: Task is Stuck in Up_For_Retry

2018-08-21 Thread ramandumcs
Hi All,
As per http://docs.sqlalchemy.org/en/latest/core/connections.html link db 
engine is not portable across process boundaries
"For a multiple-process application that uses the os.fork system call, or for 
example the Python multiprocessing module, it’s usually required that a 
separate Engine be used for each child process. This is because the Engine 
maintains a reference to a connection pool that ultimately references DBAPI 
connections - these tend to not be portable across process boundaries"
Please correct me if I am wrong but It seems that in Airflow 1.9 child 
processes don't create separate DB engine and so there is only one single DB 
Engine which is shared among child processes which might be causing this issue.

Thanks,
Raman Gupta 


On 2018/08/21 15:41:14, raman gupta  wrote: 
> One possibility is the unavailability of session while calling
> self.task_instance._check_and_change_state_before_execution
> function.
> (Session is provided via @provide_session decorator)
> 
> On Tue, Aug 21, 2018 at 7:09 PM vardangupta...@gmail.com <
> vardangupta...@gmail.com> wrote:
> 
> > Is there any possibility that on call of function
> > _check_and_change_state_before_execution at
> > https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/jobs.py#L2500,
> > this method is not actually being called
> > https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/models.py#L1299?
> > Because even in a happy scenario, no logs is printed from method's
> > implementation and directly control is reaching here
> > https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/jobs.py#L2512
> > while in stuck phase, we are seeing this log
> > https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/jobs.py#L2508
> > i.e. Task is not able to be run, FYI we've not set any sort of dependency
> > with dag.
> >
> > Regards,
> > Vardan Gupta
> >
> > On 2018/08/16 08:25:37, ramandu...@gmail.com 
> > wrote:
> > > Hi All,
> > >
> > > We are using airflow 1.9 with Local Executor more. Intermittently we are
> > observing that tasks are getting stuck in "up_for_retry" mode and are
> > getting retried again and again exceeding their configured max retries
> > count. like we have configured max retries as 2 but task is retried 15
> > times and got stuck in up_for_retry state.
> > > Any pointer on this would be helpful.
> > >
> > > Thanks,
> > > Raman Gupta
> > >
> >
> 


Re: Task is Stuck in Up_For_Retry

2018-08-24 Thread ramandumcs
Hi All,
Any pointer on this would be helpful. We have added extra logs and are trying 
few thing to get the root cause. But we are getting logs like "Task is not able 
to run".
And we are not getting any resource usage related error.

Thanks,
Raman Gupta  

On 2018/08/21 16:46:56, ramandu...@gmail.com  wrote: 
> Hi All,
> As per http://docs.sqlalchemy.org/en/latest/core/connections.html link db 
> engine is not portable across process boundaries
> "For a multiple-process application that uses the os.fork system call, or for 
> example the Python multiprocessing module, it’s usually required that a 
> separate Engine be used for each child process. This is because the Engine 
> maintains a reference to a connection pool that ultimately references DBAPI 
> connections - these tend to not be portable across process boundaries"
> Please correct me if I am wrong but It seems that in Airflow 1.9 child 
> processes don't create separate DB engine and so there is only one single DB 
> Engine which is shared among child processes which might be causing this 
> issue.
> 
> Thanks,
> Raman Gupta 
> 
> 
> On 2018/08/21 15:41:14, raman gupta  wrote: 
> > One possibility is the unavailability of session while calling
> > self.task_instance._check_and_change_state_before_execution
> > function.
> > (Session is provided via @provide_session decorator)
> > 
> > On Tue, Aug 21, 2018 at 7:09 PM vardangupta...@gmail.com <
> > vardangupta...@gmail.com> wrote:
> > 
> > > Is there any possibility that on call of function
> > > _check_and_change_state_before_execution at
> > > https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/jobs.py#L2500,
> > > this method is not actually being called
> > > https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/models.py#L1299?
> > > Because even in a happy scenario, no logs is printed from method's
> > > implementation and directly control is reaching here
> > > https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/jobs.py#L2512
> > > while in stuck phase, we are seeing this log
> > > https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/jobs.py#L2508
> > > i.e. Task is not able to be run, FYI we've not set any sort of dependency
> > > with dag.
> > >
> > > Regards,
> > > Vardan Gupta
> > >
> > > On 2018/08/16 08:25:37, ramandu...@gmail.com 
> > > wrote:
> > > > Hi All,
> > > >
> > > > We are using airflow 1.9 with Local Executor more. Intermittently we are
> > > observing that tasks are getting stuck in "up_for_retry" mode and are
> > > getting retried again and again exceeding their configured max retries
> > > count. like we have configured max retries as 2 but task is retried 15
> > > times and got stuck in up_for_retry state.
> > > > Any pointer on this would be helpful.
> > > >
> > > > Thanks,
> > > > Raman Gupta
> > > >
> > >
> > 
> 


Re: Best Practice of Airflow Setting-Up & Usage

2018-09-06 Thread ramandumcs
Hi,
We have a requirement to scale to run 1000(s) concurrent dags. With celery 
executor we observed that 
Airflow worker gets stuck sometimes if connection to redis/mysql breaks 
(https://github.com/celery/celery/issues/3932
https://github.com/celery/celery/issues/4457)
Currently we are using Airflow 1.9 with LocalExecutor but planning to switch to 
Airflow 1.10 with K8 Executor.

Thanks,
Raman Gupta


On 2018/09/05 12:56:38, Deng Xiaodong  wrote: 
> Hi folks,
> 
> May you kindly share how your organization is setting up Airflow and using
> it? Especially in terms of architecture. For example,
> 
> - *Setting-Up*: Do you install Airflow in a "one-time" fashion, or
> containerization fashion?
> - *Executor:* Which executor are you using (*LocalExecutor*,
> *CeleryExecutor*, etc)? I believe most production environments are using
> *CeleryExecutor*?
> - *Scale*: If using Celery, normally how many worker nodes do you add? (for
> sure this is up to workloads and performance of your worker nodes).
> - *Queue*: if Queue feature
>  is used in your
> architecture? For what advantage? (for example, explicitly assign
> network-bound tasks to a worker node whose parallelism can be much higher
> than its # of cores)
> - *SLA*: do you have any SLA for your scheduling? (this is inspired by
> @yrqls21's PR 3830 )
> - etc.
> 
> Airflow's setting-up can be quite flexible, but I believe there is some
> sort of best practice, especially in the organisations where scalability is
> essential.
> 
> Thanks for sharing in advance!
> 
> 
> Best regards,
> XD
> 


Re: Best Practice of Airflow Setting-Up & Usage

2018-09-06 Thread ramandumcs
Yeah, we are seeing scheduler becoming bottleneck as number of DAG files 
increase as scheduler can scale vertically and not horizontally.
We are trying with multiple independent airflow setup and are distributing the 
load between them.
But managing these many airflow clusters is becoming a challenge.

Thanks,
Raman Gupta

On 2018/09/06 14:55:26, Deng Xiaodong  wrote: 
> Thanks for sharing, Raman.
> 
> Based on what you shared, I think there are two points that may be worth
> further discussing/thinking.
> 
> *Scaling up (given thousands of DAGs):*
> If you have thousands of DAGs, you may encounter longer scheduling latency
> (actual start time minus planned start time).
> For workers, we can scale horizontally by adding more worker nodes, which
> is relatively straightforward.
> But *Scheduler* may become another bottleneck.Scheduler can only be running
> on one node (please correct me if I'm wrong). Even if we can use multiple
> threads for it, it has its limit. HA is another concern. This is also what
> our team is looking into at this moment, since scheduler is the biggest
> "bottleneck" identified by us so far (anyone has experience tuning
> scheduler performance?).
> 
> *Broker for Celery Executor*:
> you may want to try RabbitMQ rather than Redis/SQL as broker? Actually the
> Celery community had the proposal to deprecate Redis as broker (of course
> this proposal was rejected eventually) [
> https://github.com/celery/celery/issues/3274].
> 
> 
> Regards,
> XD
> 
> 
> 
> 
> 
> On Thu, Sep 6, 2018 at 6:10 PM ramandu...@gmail.com 
> wrote:
> 
> > Hi,
> > We have a requirement to scale to run 1000(s) concurrent dags. With celery
> > executor we observed that
> > Airflow worker gets stuck sometimes if connection to redis/mysql breaks
> > (https://github.com/celery/celery/issues/3932
> > https://github.com/celery/celery/issues/4457)
> > Currently we are using Airflow 1.9 with LocalExecutor but planning to
> > switch to Airflow 1.10 with K8 Executor.
> >
> > Thanks,
> > Raman Gupta
> >
> >
> > On 2018/09/05 12:56:38, Deng Xiaodong  wrote:
> > > Hi folks,
> > >
> > > May you kindly share how your organization is setting up Airflow and
> > using
> > > it? Especially in terms of architecture. For example,
> > >
> > > - *Setting-Up*: Do you install Airflow in a "one-time" fashion, or
> > > containerization fashion?
> > > - *Executor:* Which executor are you using (*LocalExecutor*,
> > > *CeleryExecutor*, etc)? I believe most production environments are using
> > > *CeleryExecutor*?
> > > - *Scale*: If using Celery, normally how many worker nodes do you add?
> > (for
> > > sure this is up to workloads and performance of your worker nodes).
> > > - *Queue*: if Queue feature
> > >  is used in your
> > > architecture? For what advantage? (for example, explicitly assign
> > > network-bound tasks to a worker node whose parallelism can be much higher
> > > than its # of cores)
> > > - *SLA*: do you have any SLA for your scheduling? (this is inspired by
> > > @yrqls21's PR 3830 <
> > https://github.com/apache/incubator-airflow/pull/3830>)
> > > - etc.
> > >
> > > Airflow's setting-up can be quite flexible, but I believe there is some
> > > sort of best practice, especially in the organisations where scalability
> > is
> > > essential.
> > >
> > > Thanks for sharing in advance!
> > >
> > >
> > > Best regards,
> > > XD
> > >
> >
> 


Re: Delay between tasks executions issue

2018-09-11 Thread ramandumcs
Hi Chandu,
How many dag files are there on the scheduler. As per my understanding 
scheduler processes each file to trigger any dag/task run. It spawns number of 
processes equivalent to "max_threads" count to parallelize file processing. So 
you can try by increasing airflow config's max_threads count.

Thanks,
Raman Gupta 

On 2018/09/11 07:11:23, Chandu Kavar  wrote: 
> Hi All,
> 
> We are seeing there is a delay between triggering the next task. Please
> find the attached screenshot.
> 
> First and the last task are very small. So, you can see the tiny object.
> 
> [image: image.png]
> Do you suggest any airflow config can resolve this problem?
> 
> Thanks,
> Chandu
> 


Re: Delay between tasks executions issue

2018-09-11 Thread ramandumcs
We use max_threads = number of scheduler cores.

On 2018/09/11 09:49:53, Chandu Kavar  wrote: 
> Thanks Raman,
> 
> Understood.
> 
> We have around 500 DAGs. What value do you suggest for max_threads?
> 
> On Tue, Sep 11, 2018, 5:44 PM ramandu...@gmail.com 
> wrote:
> 
> > Hi Chandu,
> > How many dag files are there on the scheduler. As per my understanding
> > scheduler processes each file to trigger any dag/task run. It spawns number
> > of processes equivalent to "max_threads" count to parallelize file
> > processing. So you can try by increasing airflow config's max_threads count.
> >
> > Thanks,
> > Raman Gupta
> >
> > On 2018/09/11 07:11:23, Chandu Kavar  wrote:
> > > Hi All,
> > >
> > > We are seeing there is a delay between triggering the next task. Please
> > > find the attached screenshot.
> > >
> > > First and the last task are very small. So, you can see the tiny object.
> > >
> > > [image: image.png]
> > > Do you suggest any airflow config can resolve this problem?
> > >
> > > Thanks,
> > > Chandu
> > >
> >
> 


SqlAlchemy Pool config parameters to minimize connectivity issue impact

2018-09-25 Thread ramandumcs
Hi All,

We are observing sometimes Dag tasks get failed because of some connectivity 
issues with Mysql server.
So Are there any recommended settings for mysql pool's related parameters like
sql_alchemy_pool_size = 5
sql_alchemy_pool_recycle = 3600
to minimise  the connectivity issue impact.

Thanks,
Raman Gupta


Passing Custom Env variables while launching k8 Pod

2018-10-29 Thread ramandumcs
Hi All, 
Is there a way to provide  env variables while launching K8 pod through K8 
executor.
we need to pass some env variable which are referred inside our Airflow 
Operator.
so can we provide custom env variable to docker run command while launching 
task pod.
Currently it seems that it supports predefined env variable.
worker_configuration.py
def _get_environment(self):
"""Defines any necessary environment variables for the pod executor"""
env = {
'AIRFLOW__CORE__DAGS_FOLDER': '/tmp/dags',
'AIRFLOW__CORE__EXECUTOR': 'LocalExecutor'
}
if self.kube_config.airflow_configmap:
env['AIRFLOW__CORE__AIRFLOW_HOME'] = self.worker_airflow_home
return env

Thanks,
Raman Gupta


Recommended backend metastore for Airflow

2018-12-10 Thread ramandumcs
Hi All,

It seems that Airflow supports mysql, postgresql and mssql as backend store. 
Any recommendation on using one over other. We are expecting to run 1000(s) of 
concurrent Dags which would generate heavy load on backend store.
Any pointer on this would be useful.

Thanks,
Raman Gupta


Re: Recommended backend metastore for Airflow

2018-12-10 Thread ramandumcs
Thanks Ash,
We are trying to run 1000 concurrent Dags and are facing scalability issues 
with mysql. So we are exploring other backend stores pgsql and mssql.
Any recommendation on airflow config like heartbeat interval, pool size etc..  
to support this much workload

Thanks,
Raman Gupta

On 2018/12/10 15:12:41, Ash Berlin-Taylor  wrote: 
> Postgres.
> 
> Friends don't let friends use MySQL is my personal rule.
> 
> (I can get in to the reasons if you'd like, but the short version is I find 
> Postgres has more compliant behaviour with SQL standard, and a much better 
> query planner.)
> 
> -ash
> 
> > On 10 Dec 2018, at 15:10, ramandu...@gmail.com wrote:
> > 
> > Hi All,
> > 
> > It seems that Airflow supports mysql, postgresql and mssql as backend 
> > store. Any recommendation on using one over other. We are expecting to run 
> > 1000(s) of concurrent Dags which would generate heavy load on backend store.
> > Any pointer on this would be useful.
> > 
> > Thanks,
> > Raman Gupta
> 
>