[ 
https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869870#comment-15869870
 ] 

Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 1:29 PM:
-------------------------------------------------------------------

Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +0000
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py      2017-02-16 
11:57:07.060060262 +0000
@@ -1371,6 +1371,8 @@
         last_stat_print_time = datetime(2000, 1, 1)
         # Last time that self.heartbeat() was called.
         last_self_heartbeat_time = datetime.now()
+        # Last time that self.executor.heartbeat() was called.
+        last_executor_heartbeat_time = datetime.now()
         # Last time that the DAG dir was traversed to look for files
         last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
                 self._execute_task_instances(simple_dag_bag,
                                              (State.SCHEDULED,))

-            # Call hearbeats
-            self.logger.info("Heartbeating the executor")
-            self.executor.heartbeat()
+            # Heartbeat the executor periodically
+            time_since_last_heartbeat = (datetime.now() -
+                                         
last_executor_heartbeat_time).total_seconds()
+            if time_since_last_heartbeat > self.heartrate:
+                self.logger.info("Heartbeating the executor")
+                try: self.executor.heartbeat()
+                except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+                last_executor_heartbeat_time = datetime.now()

             # Process events from the executor
             self._process_executor_events()
{code}

As a comment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed, 
leaving jobs in the celery queue indefinitely and celery workers idling. Thus, 
wrapping the scheduler in a {{while True}} loop as suggested elsewhere does 
nothing for us.


was (Author: erikcederstrand):
Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less 
frequently. Preciously, the scheduler would die every ~10 seconds, now it can 
live for some minutes. Here's a modified patch to simply ignore 
{{ConnectionResetError}}, which has been running for ~1 hour now:

{code}
--- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 
11:58:55.057991344 +0000
+++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py      2017-02-16 
11:57:07.060060262 +0000
@@ -1371,6 +1371,8 @@
         last_stat_print_time = datetime(2000, 1, 1)
         # Last time that self.heartbeat() was called.
         last_self_heartbeat_time = datetime.now()
+        # Last time that self.executor.heartbeat() was called.
+        last_executor_heartbeat_time = datetime.now()
         # Last time that the DAG dir was traversed to look for files
         last_dag_dir_refresh_time = datetime.now()

@@ -1436,9 +1438,14 @@
                 self._execute_task_instances(simple_dag_bag,
                                              (State.SCHEDULED,))

-            # Call hearbeats
-            self.logger.info("Heartbeating the executor")
-            self.executor.heartbeat()
+            # Heartbeat the executor periodically
+            time_since_last_heartbeat = (datetime.now() -
+                                         
last_executor_heartbeat_time).total_seconds()
+            if time_since_last_heartbeat > self.heartrate:
+                self.logger.info("Heartbeating the executor")
+                try: self.executor.heartbeat()
+                except ConnectionResetError: pass  # RabbitMQ sometimes resets 
the socket connection
+                last_executor_heartbeat_time = datetime.now()

             # Process events from the executor
             self._process_executor_events()
{code}

As acomment to the justifiability of this patch, our scheduler in production 
often dies so early in the scheduling process that jobs are never progressed. 
Thus, wrapping the scheduler in a {{while True}} loop as suggested elsewhere 
does nothing for us.

>  exception in 'airflow scheduler' : Connection reset by peer
> ------------------------------------------------------------
>
>                 Key: AIRFLOW-342
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-342
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: celery, scheduler
>    Affects Versions: Airflow 1.7.1.3
>         Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo)
> Python: 2.7.5
> Airflow: 1.7.1.3
>            Reporter: Hila Visan
>
> 'airflow scheduler' command throws an exception when running it. 
> Despite the exception, the workers run the tasks from the queues as expected.
> Error details:
>  
> [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset 
> by peer
> Traceback (most recent call last):
>   File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in 
> _execute
>     executor.heartbeat()
>   File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", 
> line 107, in heartbeat
>     self.sync()
>   File 
> "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line 
> 74, in sync
>     state = async.state
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state
>     return self._get_task_meta()['status']
>   File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in 
> _get_task_meta
>     return self._maybe_set_cache(self.backend.get_task_meta(self.id))
>   File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, 
> in get_task_meta
>     binding.declare()
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in 
> declare
>    self.exchange.declare(nowait)
>   File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in 
> declare
>     nowait=nowait, passive=passive,
>   File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in 
> exchange_declare
>     self._send_method((40, 10), args)
>   File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, 
> in _send_method
>     self.channel_id, method_sig, args, content,
>   File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, 
> in write_method
>     write_frame(1, channel, payload)
>   File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in 
> write_frame
>     frame_type, channel, size, payload, 0xce,
>   File "/usr/lib64/python2.7/socket.py", line 224, in meth
>     return getattr(self._sock,name)(*args)
> error: [Errno 104] Connection reset by peer
> [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia!



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to