[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-09-30 Thread JIRA

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16187027#comment-16187027
 ] 

Christoph Hösler edited comment on AIRFLOW-342 at 9/30/17 6:01 PM:
---

I get a similar exception with airflow 1.8.2 using celery 4.1.0, rabbitmq 
3.6.12.
I'm using amqp 2.2.2 instead of librabbitmq because of 
https://issues.apache.org/jira/browse/AIRFLOW-1645.


was (Author: christoph.hoesler):
I got a similar exception with airflow 1.8.2 using celery 4.1.0, rabbitmq 
3.6.12.
I'm using amqp 2.2.2 instead of librabbitmq because of 
https://issues.apache.org/jira/browse/AIRFLOW-1645.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>Assignee: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-27 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869870#comment-15869870
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/27/17 12:38 PM:


Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Previously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As a comment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed, 
leaving jobs in the celery queue indefinitely and celery workers idling. Thus, 
wrapping the scheduler in a {{while True}} loop as suggested elsewhere does 
nothing for us.


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As a comment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed, 
leaving jobs in the celery queue indefinitely and celery workers idling. Thus, 
wrapping the scheduler in a {{while True}} loop as suggested elsewhere does 
nothing for us.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: 

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869870#comment-15869870
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 1:29 PM:
---

Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As a comment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed, 
leaving jobs in the celery queue indefinitely and celery workers idling. Thus, 
wrapping the scheduler in a {{while True}} loop as suggested elsewhere does 
nothing for us.


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As acomment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed. 
Thus, wrapping the scheduler in a {{while True}} loop as suggested elsewhere 
does nothing for us.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: 

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869870#comment-15869870
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 1:28 PM:
---

Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

As acomment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed. 
Thus, wrapping the scheduler in a {{while True}} loop as suggested elsewhere 
does nothing for us.


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} 

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869870#comment-15869870
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:54 PM:


Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die ever ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))

-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+try: self.executor.heartbeat()
+except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+last_executor_heartbeat_time = datetime.now()

 # Process events from the executor
 self._process_executor_events()
{code}

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in 

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869793#comment-15869793
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:08 PM:


I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:57:07.060060262 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to thow the minimal change required.


was (Author: erikcederstrand):
I think I found the culprit. The scheduler is not careful to rate-limit 
hearbeats to the executor, and if they happen too often, then RabbitMQ will 
close the connection. Here's a patch that fixes the exception for me:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py  2017-02-16 
11:41:10.0 +
+++ jobs.py 2017-02-16 11:40:28.638116325 +
@@ -1371,6 +1371,8 @@
 last_stat_print_time = datetime(2000, 1, 1)
 # Last time that self.heartbeat() was called.
 last_self_heartbeat_time = datetime.now()
+# Last time that self.executor.heartbeat() was called.
+last_executor_heartbeat_time = datetime.now()
 # Last time that the DAG dir was traversed to look for files
 last_dag_dir_refresh_time = datetime.now()
 
@@ -1436,9 +1438,13 @@
 self._execute_task_instances(simple_dag_bag,
  (State.SCHEDULED,))
 
-# Call hearbeats
-self.logger.info("Heartbeating the executor")
-self.executor.heartbeat()
+# Heartbeat the executor periodically
+time_since_last_heartbeat = (datetime.now() -
+ 
last_executor_heartbeat_time).total_seconds()
+if time_since_last_heartbeat > self.heartrate:
+self.logger.info("Heartbeating the executor")
+self.executor.heartbeat()
+last_executor_heartbeat_time = datetime.now()
 
 # Process events from the executor
 self._process_executor_events()
{code}

I still think the scheduler should survive {{ConnectionClosed}} exceptions from 
an executor hearbeat as they still could occur, but I'll leave the patch as-is 
to thow the minimal change required.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File 

[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869598#comment-15869598
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 10:35 AM:


This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.

Interestingly, "airflow scheduler" only crashes if there are tasks in "STARTED" 
state when it starts. If all tasks are "SUCCESS", the scheduler does not crash.


was (Author: erikcederstrand):
This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.

Interestingly, {airflow scheduler} only crashes if there are tasks in "STARTED" 
state when it starts. If all tasks are "SUCCESS", the scheduler does not crash.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869598#comment-15869598
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 10:35 AM:


This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.

Interestingly, {airflow scheduler} only crashes if there are tasks in "STARTED" 
state when it starts. If all tasks are "SUCCESS", the scheduler does not crash.


was (Author: erikcederstrand):
This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer

2017-02-16 Thread Erik Cederstrand (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15869598#comment-15869598
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 9:40 AM:
---

This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

This was with celery 4.0.2. Downgrading to 3.1.23 did not help.


was (Author: erikcederstrand):
This may be a Celery issue when the worker starts up when there are already 
messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the 
exact same stack trace in https://github.com/celery/celery/issues/3773

>  exception in 'airflow scheduler' : Connection reset by peer
> 
>
> Key: AIRFLOW-342
> URL: https://issues.apache.org/jira/browse/AIRFLOW-342
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: celery, scheduler
>Affects Versions: Airflow 1.7.1.3
> Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
> executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
> self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
> state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
> return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
> return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
> binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
> nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
> self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
> self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
> write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
> frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
> return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)