[jira] [Created] (AIRFLOW-883) Assigning operator to DAG via bitwise composition does not pickup default args

2017-02-16 Thread Daniel Huang (JIRA)
Daniel Huang created AIRFLOW-883:


 Summary: Assigning operator to DAG via bitwise composition does 
not pickup default args
 Key: AIRFLOW-883
 URL: https://issues.apache.org/jira/browse/AIRFLOW-883
 Project: Apache Airflow
  Issue Type: Bug
  Components: models
Reporter: Daniel Huang
Priority: Minor


This is only the case when the operator does not specify {{dag=dag}} and is not 
initialized within a DAG's context manager (due to 
https://github.com/apache/incubator-airflow/blob/fb0c5775cda4f84c07d8d5c0e6277fc387c172e6/airflow/utils/decorators.py#L50)

Example:

{code}
default_args = {
'owner': 'airflow', 
'start_date': datetime(2017, 2, 1)
}
dag = DAG('my_dag', default_args=default_args)
dummy = DummyOperator(task_id='dummy')

dag >> dummy
{code}

This will raise a {{Task is missing the start_date parameter}}. I _think_ this 
should probably be allowed because I assume the purpose of supporting {{dag >> 
op}} was to allow delayed assignment of an operator to a DAG. 

I believe to fix this, on assignment, we would need to go back and go through 
dag.default_args to see if any of those attrs weren't explicitly set on 
task...not the cleanest. 



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-882) Code example in docs has unnecessary DAG>>Operator assignment

2017-02-16 Thread Daniel Huang (JIRA)
Daniel Huang created AIRFLOW-882:


 Summary: Code example in docs has unnecessary DAG>>Operator 
assignment
 Key: AIRFLOW-882
 URL: https://issues.apache.org/jira/browse/AIRFLOW-882
 Project: Apache Airflow
  Issue Type: Improvement
  Components: docs
Reporter: Daniel Huang
Assignee: Daniel Huang
Priority: Trivial


The docs currently say:

{code}
We can put this all together to build a simple pipeline:

with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag:
(
dag
>> DummyOperator(task_id='dummy_1')
>> BashOperator(
task_id='bash_1',
bash_command='echo "HELLO!"')
>> PythonOperator(
task_id='python_1',
python_callable=lambda: print("GOODBYE!"))
)
{code}

But the {{dag >> ...}} is unnecessary because the operators are already 
initialized with the proper DAG 
(https://github.com/apache/incubator-airflow/blob/fb0c5775cda4f84c07d8d5c0e6277fc387c172e6/airflow/models.py#L1699).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Work started] (AIRFLOW-881) Create SubDagOperator within DAG context manager without passing dag param

2017-02-16 Thread Daniel Huang (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Work on AIRFLOW-881 started by Daniel Huang.

> Create SubDagOperator within DAG context manager without passing dag param
> --
>
> Key: AIRFLOW-881
> URL: https://issues.apache.org/jira/browse/AIRFLOW-881
> Project: Apache Airflow
>  Issue Type: Improvement
>  Components: operators, subdag
>Reporter: Daniel Huang
>Assignee: Daniel Huang
>Priority: Trivial
>
> Currently, the following raises a {{Please pass in the `dag` param}} 
> exception:
> {code}
> with DAG('my_dag', default_args=default_args) as dag:
> op = SubDagOperator(task_id='my_subdag', subdag=subdag_factory(...))
> {code}
> But the SubDagOperator should be aware if it's in the context manager of a 
> dag without having to specify {{dag=dag}} when initializing the operator. 
> Similar to how the {{@apply_defaults}} decorator does it 
> (https://github.com/apache/incubator-airflow/blob/fb0c5775cda4f84c07d8d5c0e6277fc387c172e6/airflow/utils/decorators.py#L50).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-881) Create SubDagOperator within DAG context manager without passing dag param

2017-02-16 Thread Daniel Huang (JIRA)
Daniel Huang created AIRFLOW-881:


 Summary: Create SubDagOperator within DAG context manager without 
passing dag param
 Key: AIRFLOW-881
 URL: https://issues.apache.org/jira/browse/AIRFLOW-881
 Project: Apache Airflow
  Issue Type: Improvement
  Components: operators, subdag
Reporter: Daniel Huang
Assignee: Daniel Huang
Priority: Trivial


Currently, the following raises a {{Please pass in the `dag` param}} exception:

{code}
with DAG('my_dag', default_args=default_args) as dag:
op = SubDagOperator(task_id='my_subdag', subdag=subdag_factory(...))
{code}

But the SubDagOperator should be aware if it's in the context manager of a dag 
without having to specify {{dag=dag}} when initializing the operator. Similar 
to how the {{@apply_defaults}} decorator does it 
(https://github.com/apache/incubator-airflow/blob/fb0c5775cda4f84c07d8d5c0e6277fc387c172e6/airflow/utils/decorators.py#L50).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-880) Fix remote log functionality inconsistencies for Webservers

2017-02-16 Thread Dan Davydov (JIRA)
Dan Davydov created AIRFLOW-880:
---

 Summary: Fix remote log functionality inconsistencies for 
Webservers
 Key: AIRFLOW-880
 URL: https://issues.apache.org/jira/browse/AIRFLOW-880
 Project: Apache Airflow
  Issue Type: Bug
  Components: webserver
Reporter: Dan Davydov
Assignee: Dan Davydov


Right now log functionality is not consistent when it comes to remote logs.

1. Lack of Complete Logs: Remote logs should be the default instead of the log 
that is only loaded if the local log is not present, because the remote log 
will have the logs for all of the tries of a task, whereas the local log is 
only guaranteed to have the most recent one

2. Lack of Consistency: The logs returned will always be the same from all the 
webservers (right now different logs can be returned if a webserver has a log 
vs doesn't, and there can be different logs between webservers that have the 
log).



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869870#comment-15869870
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 1:29 PM:
---

Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As a comment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed, 
leaving jobs in the celery queue indefinitely and celery workers idling. Thus, 
wrapping the scheduler in a {{while True}} loop as suggested elsewhere does 
nothing for us.


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As acomment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed. 
Thus, wrapping the scheduler in a {{while True}} loop as suggested elsewhere 
does nothing for us.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
>  

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869870#comment-15869870
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 1:28 PM:
---

Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As acomment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed. 
Thus, wrapping the scheduler in a {{while True}} loop as suggested elsewhere 
does nothing for us.


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] 

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869870#comment-15869870
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:54 PM:


Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die ever ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> l

[jira] [Commented] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869870#comment-15869870
 ] 

Erik Cederstrand commented on AIRFLOW-342:
--

Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die ever ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869793#comment-15869793
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:52 PM:


I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionResetError}} exceptions 
from an executor hearbeat as they still could occur, but I'll leave the patch 
as-is to show the minimal change required.


was (Author: erikcederstrand):
I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to show the minimal change required.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869793#comment-15869793
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:08 PM:


I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to thow the minimal change required.


was (Author: erikcederstrand):
I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:41:10.0 +
+++ jobs.py 2017-02-16 11:40:28.638116325 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to thow the minimal change required.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packag

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869793#comment-15869793
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:08 PM:


I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to show the minimal change required.


was (Author: erikcederstrand):
I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to thow the minimal change required.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.hea

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869793#comment-15869793
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 11:47 AM:


I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:41:10.0 +
+++ jobs.py 2017-02-16 11:40:28.638116325 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to thow the minimal change required.


was (Author: erikcederstrand):
I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{{code}}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:41:10.0 +
+++ jobs.py 2017-02-16 11:40:28.638116325 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{{code}}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to thow the minimal change required.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> li

[jira] [Commented] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869793#comment-15869793
 ] 

Erik Cederstrand commented on AIRFLOW-342:
--

I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{{code}}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:41:10.0 +
+++ jobs.py 2017-02-16 11:40:28.638116325 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{{code}}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to thow the minimal change required.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869598#comment-15869598
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 10:35 AM:


This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.

Interestingly, "airflow scheduler" only crashes if there are tasks in "STARTED" 
state when it starts. If all tasks are "SUCCESS", the scheduler does not crash.


was (Author: erikcederstrand):
This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.

Interestingly, {airflow scheduler} only crashes if there are tasks in "STARTED" 
state when it starts. If all tasks are "SUCCESS", the scheduler does not crash.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869598#comment-15869598
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 10:35 AM:


This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.

Interestingly, {airflow scheduler} only crashes if there are tasks in "STARTED" 
state when it starts. If all tasks are "SUCCESS", the scheduler does not crash.


was (Author: erikcederstrand):
This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869598#comment-15869598
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 9:40 AM:
---

This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.


was (Author: erikcederstrand):
This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869598#comment-15869598
 ] 

Erik Cederstrand commented on AIRFLOW-342:
--

This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)