Re: Custom (mongodb) hook as a plugin

2016-10-03 Thread Jakob Homan
On 23 September 2016 at 15:06, Rob Goretsky  wrote:
> More generally, would it make sense to create an enhancement to the plugin
> architecture that allows the developer of a new hook to also specify the
> name of a new 'connection type' along with what fields would be required
> for that connection type?  The web UI would then take its existing
> hardcoded list and append these plugged-in connection types and fields to
> display...

Yeah, I filed a ticket a while ago to do so
(https://issues.apache.org/jira/browse/AIRFLOW-235)

Additionally, it would be good if the connections were not hardcoded
so that it were easier to add new connections directly (perhaps
through the config file)

-Jakob


Re: Airflow Latency Between Task Scheduling and task Execution

2016-10-03 Thread George Leslie-Waksman
If you want to run faster and you're only using one machine, I would
recommend using the LocalExecutor instead of the CeleryExecutor.

On the more general topic, Airflow is NOT designed for processing real time
data. It is a workflow tool for batch processing. The sweet spot for
Airflow is running DAGs once per day. It works fine at once an hour and
there are plenty of people pushing it down to once every 5 minutes or so
but if you're operating on the order of seconds, you want a streaming data
processing tool

Can you restructure your problem so that your real time data is dumped
somewhere and Airflow processes batches of dumped data?

On Mon, Oct 3, 2016 at 10:57 AM Phil Daly  wrote:

> I posted this on StackOverflow but kind-of need a response “real soon now”
> …
>
> I'm sure this is a newbie question so apologies in advance. Running
> Airflow 1.7.x on a virtual Ubuntu 16.x machine with a MacBookPro host.
> Resources are not an issue. What I need to know is how to manipulate the
> scheduler latency: I have 2 tasks and task 2 follows task 1, nice and
> simple. With just this DAG, I noticed that task 2 would typically run 15s
> after task 1 completes and I'm wondering if I can get that much lower? I
> have re-configured to use a CeleryExecutor with 1 worker node and changed
> the job_heartbeat_sec and scheduler_heartbeat_sec to 1 (each). These are
> integers so I can't express sub-second scheduling. Now my task 2 will run
> ~3s after task 1 but I'd still like to get it lower, preferably sub-second.
> The wiki pages suggests that the scheduler can take 0.05-015s which, if
> that is not a typo, suggests sub-second task scheduling is possible. I can
> run this airflow invocation on a dedicated machine, if I have to, so that
> nothing else is interfering with it.
>
> So, am I pushing airflow too hard? Or can I get task 2 to run pretty much
> as soon as task 1 has finished? If so, how?
>
> I have added a PS for a bit more contextual information if you need it.
> Thanks in advance,
>
> Phil Daly
> p...@lsst.org
>
> A little bit more information on what I am trying to do (the OCS =
> observatory / observation control system):
>
>
> OCS could use a workflow engine for observation sequencing (and other
> tasks) with the following set up:
>
>   *   a firm real-time queue for executing night-time observations for
> each telescope. In this sense, we would like a workflow and the tasks it
> contains to be scheduled and executed within a very short amount of time,
> say, < 1s. How much latency we can adopt here is open to question;
>   *   a soft real-time queue for executing day-time calibrations and
> engineering functions for each telescope. In this sense, the workflow and
> the tasks it contains should start promptly but we can accept scheduling
> delay between, say, 1–3s and perhaps longer;
>   *   a regular queue for cron-like jobs for each telescope (end of night
> reports etc). In this sense, we leave it to Airflow to determine the
> scheduling and accept that these jobs might not start for up to 30s after
> they become runnable.
>
> Clearly Airflow can handle the second 2 use cases but I really need to
> know if I can make it fly for the first (firm real-time queue)
>
>


Re: Regarding installation issue

2016-10-03 Thread Bolke de Bruin
We don’t support python 2.6 .

Please upgrade to at least 2.7.

Bolke

> Op 3 okt. 2016, om 08:23 heeft twinkle sachdeva  
> het volgende geschreven:
> 
> Hi,
> 
> I have been trying to install Airflow on one of the VMs ( Python version :
> 2.6.6 and Pip Version : 7.1.0 ).
> 
> I am getting the following stack trace  ->
> 
> creating build/docs
> 
>copying docs/index.txt -> build/docs
> 
>Converting docs/index.txt -> build/docs/index.html
> 
>Traceback (most recent call last):
> 
>  File "", line 1, in 
> 
>  File "/tmp/pip-build-pCrt4S/markdown/setup.py", line 270, in 
> 
>'Topic :: Text Processing :: Markup :: HTML'
> 
>  File "/usr/lib64/python2.6/distutils/core.py", line 152, in setup
> 
>dist.run_commands()
> 
>  File "/usr/lib64/python2.6/distutils/dist.py", line 975, in
> run_commands
> 
>self.run_command(cmd)
> 
>  File "/usr/lib64/python2.6/distutils/dist.py", line 995, in
> run_command
> 
>cmd_obj.run()
> 
>  File
> "/usr/lib/python2.6/site-packages/setuptools/command/install.py", line 53,
> in run
> 
>return _install.run(self)
> 
>  File "/usr/lib64/python2.6/distutils/command/install.py", line 577,
> in run
> 
>self.run_command('build')
> 
>  File "/usr/lib64/python2.6/distutils/cmd.py", line 333, in run_command
> 
>self.distribution.run_command(command)
> 
>  File "/usr/lib64/python2.6/distutils/dist.py", line 995, in
> run_command
> 
>cmd_obj.run()
> 
>  File "/usr/lib64/python2.6/distutils/command/build.py", line 134, in
> run
> 
>self.run_command(cmd_name)
> 
>  File "/usr/lib64/python2.6/distutils/cmd.py", line 333, in run_command
> 
>self.distribution.run_command(command)
> 
>  File "/usr/lib64/python2.6/distutils/dist.py", line 995, in
> run_command
> 
>cmd_obj.run()
> 
>  File "/tmp/pip-build-pCrt4S/markdown/setup.py", line 184, in run
> 
>out = template % self._get_context(src, outfile)
> 
>  File "/tmp/pip-build-pCrt4S/markdown/setup.py", line 116, in
> _get_context
> 
>c['body'] = self.md.convert(src)
> 
>  File "build/lib/markdown/__init__.py", line 375, in convert
> 
>newRoot = treeprocessor.run(root)
> 
>  File "build/lib/markdown/extensions/toc.py", line 229, in run
> 
>for el in doc.iter():
> 
>AttributeError: iter
> 
> 
> 
>
> 
> Command "/usr/bin/python -c "import setuptools,
> tokenize;__file__='/tmp/pip-build-pCrt4S/markdown/setup.py';exec(compile(getattr(tokenize,
> 'open', open)(__file__).read().replace('\r\n', '\n'), __file__, 'exec'))"
> install --record /tmp/pip-RYL54E-record/install-record.txt
> --single-version-externally-managed --compile" failed with error code 1 in
> /tmp/pip-build-pCrt4S/markdown
> 
> 
> 
> Please help.
> 
> 
> Regards,
> 
> Twinkle



Airflow bugs but stays running

2016-10-03 Thread Renaud Grisoni
Hi all,

I use Airflow v1.7.1.3 with the local scheduler and I encounter a problem
with the scheduler :
For some reason, the airflow database is no more accessible so the
scheduler display the OperationalError below. My problem is the scheduler
does not kill itself after this error, it is running but it does not run
any DAG any more. I cannot automatically restart it with Supervisor because
its process is always displayed as runnning. Each time I have a network
error, Airflow display this error and enters in this "zombie" mode, and my
DAG are not processed.

Have you heard about this problem, any suggestions?


29/09/2016 21:09:53Traceback (most recent call last):
29/09/2016 21:09:53  File "/usr/bin/airflow", line 15, in 
29/09/2016 21:09:53args.func(args)
29/09/2016 21:09:53  File
"/usr/lib/python2.7/site-packages/airflow/bin/cli.py",
line 455, in scheduler
29/09/2016 21:09:53job.run()
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-packages/airflow/jobs.py",
line 173, in run
29/09/2016 21:09:53self._execute()
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-packages/airflow/jobs.py",
line 712, in _execute
29/09/2016 21:09:53paused_dag_ids = dagbag.paused_dags()
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-packages/airflow/models.py",
line 429, in paused_dags
29/09/2016 21:09:53DagModel.is_paused == True)]
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-
packages/sqlalchemy/orm/query.py", line 2761, in __iter__
29/09/2016 21:09:53return self._execute_and_instances(context)
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-
packages/sqlalchemy/orm/query.py", line 2774, in _execute_and_instances
29/09/2016 21:09:53close_with_result=True)
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-
packages/sqlalchemy/orm/query.py", line 2765, in _connection_from_session
29/09/2016 21:09:53**kw)
29/09/2016 21:09:53  File
"/usr/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 893, in connection
29/09/2016 21:09:53execution_options=execution_options)
29/09/2016 21:09:53  File
"/usr/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 898, in _connection_for_bind
29/09/2016 21:09:53engine, execution_options)
29/09/2016 21:09:53  File
"/usr/lib/python2.7/site-packages/sqlalchemy/orm/session.py",
line 334, in _connection_for_bind
29/09/2016 21:09:53conn = bind.contextual_connect()
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-
packages/sqlalchemy/engine/base.py", line 2039, in contextual_connect
29/09/2016 21:09:53self._wrap_pool_connect(self.pool.connect, None),
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-
packages/sqlalchemy/engine/base.py", line 2078, in _wrap_pool_connect
29/09/2016 21:09:53e, dialect, self)
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-
packages/sqlalchemy/engine/base.py", line 1405, in _handle_dbapi_exception_
noconnection
29/09/2016 21:09:53exc_info
29/09/2016 21:09:53  File
"/usr/lib/python2.7/site-packages/sqlalchemy/util/compat.py",
line 202, in raise_from_cause
29/09/2016 21:09:53reraise(type(exception), exception, tb=exc_tb,
cause=cause)
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-
packages/sqlalchemy/engine/base.py", line 2074, in _wrap_pool_connect
29/09/2016 21:09:53return fn()
29/09/2016 21:09:53  File
"/usr/lib/python2.7/site-packages/sqlalchemy/pool.py",
line 376, in connect
29/09/2016 21:09:53return _ConnectionFairy._checkout(self)
29/09/2016 21:09:53  File
"/usr/lib/python2.7/site-packages/sqlalchemy/pool.py",
line 713, in _checkout
29/09/2016 21:09:53fairy = _ConnectionRecord.checkout(pool)
29/09/2016 21:09:53  File
"/usr/lib/python2.7/site-packages/sqlalchemy/pool.py",
line 485, in checkout
29/09/2016 21:09:53rec.checkin()
29/09/2016 21:09:53  File
"/usr/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py",
line 60, in __exit__
29/09/2016 21:09:53compat.reraise(exc_type, exc_value, exc_tb)
29/09/2016 21:09:53  File
"/usr/lib/python2.7/site-packages/sqlalchemy/pool.py",
line 482, in checkout
29/09/2016 21:09:53dbapi_connection = rec.get_connection()
29/09/2016 21:09:53  File
"/usr/lib/python2.7/site-packages/sqlalchemy/pool.py",
line 563, in get_connection
29/09/2016 21:09:53self.connection = self.__connect()
29/09/2016 21:09:53  File
"/usr/lib/python2.7/site-packages/sqlalchemy/pool.py",
line 607, in __connect
29/09/2016 21:09:53connection = self.__pool._invoke_creator(self)
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-
packages/sqlalchemy/engine/strategies.py", line 97, in connect
29/09/2016 21:09:53return dialect.connect(*cargs, **cparams)
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-
packages/sqlalchemy/engine/default.py", line 385, in connect
29/09/2016 21:09:53return self.dbapi.connect(*cargs, **cparams)
29/09/2016 21:09:53  File "/usr/lib/python2.7/site-
packages/psycopg2/__init__.py", line 164, in connect
29/09/2016 21:09:53conn = _connect(dsn,
connection_factory=connection_factory,
async=async)