[jira] [Created] (AIRFLOW-883) Assigning operator to DAG via bitwise composition does not pickup default args
Daniel Huang created AIRFLOW-883: Summary: Assigning operator to DAG via bitwise composition does not pickup default args Key: AIRFLOW-883 URL: https://issues.apache.org/jira/browse/AIRFLOW-883 Project: Apache Airflow Issue Type: Bug Components: models Reporter: Daniel Huang Priority: Minor This is only the case when the operator does not specify {{dag=dag}} and is not initialized within a DAG's context manager (due to https://github.com/apache/incubator-airflow/blob/fb0c5775cda4f84c07d8d5c0e6277fc387c172e6/airflow/utils/decorators.py#L50) Example: {code} default_args = { 'owner': 'airflow', 'start_date': datetime(2017, 2, 1) } dag = DAG('my_dag', default_args=default_args) dummy = DummyOperator(task_id='dummy') dag >> dummy {code} This will raise a {{Task is missing the start_date parameter}}. I _think_ this should probably be allowed because I assume the purpose of supporting {{dag >> op}} was to allow delayed assignment of an operator to a DAG. I believe to fix this, on assignment, we would need to go back and go through dag.default_args to see if any of those attrs weren't explicitly set on task...not the cleanest. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-882) Code example in docs has unnecessary DAG>>Operator assignment
Daniel Huang created AIRFLOW-882: Summary: Code example in docs has unnecessary DAG>>Operator assignment Key: AIRFLOW-882 URL: https://issues.apache.org/jira/browse/AIRFLOW-882 Project: Apache Airflow Issue Type: Improvement Components: docs Reporter: Daniel Huang Assignee: Daniel Huang Priority: Trivial The docs currently say: {code} We can put this all together to build a simple pipeline: with DAG('my_dag', start_date=datetime(2016, 1, 1)) as dag: ( dag >> DummyOperator(task_id='dummy_1') >> BashOperator( task_id='bash_1', bash_command='echo "HELLO!"') >> PythonOperator( task_id='python_1', python_callable=lambda: print("GOODBYE!")) ) {code} But the {{dag >> ...}} is unnecessary because the operators are already initialized with the proper DAG (https://github.com/apache/incubator-airflow/blob/fb0c5775cda4f84c07d8d5c0e6277fc387c172e6/airflow/models.py#L1699). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Work started] (AIRFLOW-881) Create SubDagOperator within DAG context manager without passing dag param
[ https://issues.apache.org/jira/browse/AIRFLOW-881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-881 started by Daniel Huang. > Create SubDagOperator within DAG context manager without passing dag param > -- > > Key: AIRFLOW-881 > URL: https://issues.apache.org/jira/browse/AIRFLOW-881 > Project: Apache Airflow > Issue Type: Improvement > Components: operators, subdag >Reporter: Daniel Huang >Assignee: Daniel Huang >Priority: Trivial > > Currently, the following raises a {{Please pass in the `dag` param}} > exception: > {code} > with DAG('my_dag', default_args=default_args) as dag: > op = SubDagOperator(task_id='my_subdag', subdag=subdag_factory(...)) > {code} > But the SubDagOperator should be aware if it's in the context manager of a > dag without having to specify {{dag=dag}} when initializing the operator. > Similar to how the {{@apply_defaults}} decorator does it > (https://github.com/apache/incubator-airflow/blob/fb0c5775cda4f84c07d8d5c0e6277fc387c172e6/airflow/utils/decorators.py#L50). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-881) Create SubDagOperator within DAG context manager without passing dag param
Daniel Huang created AIRFLOW-881: Summary: Create SubDagOperator within DAG context manager without passing dag param Key: AIRFLOW-881 URL: https://issues.apache.org/jira/browse/AIRFLOW-881 Project: Apache Airflow Issue Type: Improvement Components: operators, subdag Reporter: Daniel Huang Assignee: Daniel Huang Priority: Trivial Currently, the following raises a {{Please pass in the `dag` param}} exception: {code} with DAG('my_dag', default_args=default_args) as dag: op = SubDagOperator(task_id='my_subdag', subdag=subdag_factory(...)) {code} But the SubDagOperator should be aware if it's in the context manager of a dag without having to specify {{dag=dag}} when initializing the operator. Similar to how the {{@apply_defaults}} decorator does it (https://github.com/apache/incubator-airflow/blob/fb0c5775cda4f84c07d8d5c0e6277fc387c172e6/airflow/utils/decorators.py#L50). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-880) Fix remote log functionality inconsistencies for Webservers
Dan Davydov created AIRFLOW-880: --- Summary: Fix remote log functionality inconsistencies for Webservers Key: AIRFLOW-880 URL: https://issues.apache.org/jira/browse/AIRFLOW-880 Project: Apache Airflow Issue Type: Bug Components: webserver Reporter: Dan Davydov Assignee: Dan Davydov Right now log functionality is not consistent when it comes to remote logs. 1. Lack of Complete Logs: Remote logs should be the default instead of the log that is only loaded if the local log is not present, because the remote log will have the logs for all of the tries of a task, whereas the local log is only guaranteed to have the most recent one 2. Lack of Consistency: The logs returned will always be the same from all the webservers (right now different logs can be returned if a webserver has a log vs doesn't, and there can be different logs between webservers that have the log). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869870#comment-15869870 ] Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 1:29 PM: --- Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less frequently. Preciously, the scheduler would die every ~10 seconds, now it can live for some minutes. Here's a modified patch to simply ignore {{ConnectionResetError}}, which has been running for ~1 hour now: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 + +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,14 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +try: self.executor.heartbeat() +except ConnectionResetError: pass # RabbitMQ sometimes resets the socket connection +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} As a comment to the justifiability of this patch, our scheduler in production often dies so early in the scheduling process that jobs are never progressed, leaving jobs in the celery queue indefinitely and celery workers idling. Thus, wrapping the scheduler in a {{while True}} loop as suggested elsewhere does nothing for us. was (Author: erikcederstrand): Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less frequently. Preciously, the scheduler would die every ~10 seconds, now it can live for some minutes. Here's a modified patch to simply ignore {{ConnectionResetError}}, which has been running for ~1 hour now: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 + +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,14 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +try: self.executor.heartbeat() +except ConnectionResetError: pass # RabbitMQ sometimes resets the socket connection +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} As acomment to the justifiability of this patch, our scheduler in production often dies so early in the scheduling process that jobs are never progressed. Thus, wrapping the scheduler in a {{while True}} loop as suggested elsewhere does nothing for us. > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 >
[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869870#comment-15869870 ] Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 1:28 PM: --- Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less frequently. Preciously, the scheduler would die every ~10 seconds, now it can live for some minutes. Here's a modified patch to simply ignore {{ConnectionResetError}}, which has been running for ~1 hour now: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 + +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,14 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +try: self.executor.heartbeat() +except ConnectionResetError: pass # RabbitMQ sometimes resets the socket connection +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} As acomment to the justifiability of this patch, our scheduler in production often dies so early in the scheduling process that jobs are never progressed. Thus, wrapping the scheduler in a {{while True}} loop as suggested elsewhere does nothing for us. was (Author: erikcederstrand): Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less frequently. Preciously, the scheduler would die every ~10 seconds, now it can live for some minutes. Here's a modified patch to simply ignore {{ConnectionResetError}}, which has been running for ~1 hour now: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 + +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,14 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +try: self.executor.heartbeat() +except ConnectionResetError: pass # RabbitMQ sometimes resets the socket connection +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 >Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130]
[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869870#comment-15869870 ] Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:54 PM: Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less frequently. Preciously, the scheduler would die every ~10 seconds, now it can live for some minutes. Here's a modified patch to simply ignore {{ConnectionResetError}}, which has been running for ~1 hour now: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 + +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,14 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +try: self.executor.heartbeat() +except ConnectionResetError: pass # RabbitMQ sometimes resets the socket connection +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} was (Author: erikcederstrand): Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less frequently. Preciously, the scheduler would die ever ~10 seconds, now it can live for some minutes. Here's a modified patch to simply ignore {{ConnectionResetError}}: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 + +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,14 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +try: self.executor.heartbeat() +except ConnectionResetError: pass # RabbitMQ sometimes resets the socket connection +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 >Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset > by peer > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in > _execute > executor.heartbeat() > File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", > l
[jira] [Commented] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869870#comment-15869870 ] Erik Cederstrand commented on AIRFLOW-342: -- Hmm. Executor heartbeats still throw {{ConnectionResetError}}, but much less frequently. Preciously, the scheduler would die ever ~10 seconds, now it can live for some minutes. Here's a modified patch to simply ignore {{ConnectionResetError}}: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 + +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,14 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +try: self.executor.heartbeat() +except ConnectionResetError: pass # RabbitMQ sometimes resets the socket connection +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 >Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset > by peer > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in > _execute > executor.heartbeat() > File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", > line 107, in heartbeat > self.sync() > File > "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line > 74, in sync > state = async.state > File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state > return self._get_task_meta()['status'] > File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in > _get_task_meta > return self._maybe_set_cache(self.backend.get_task_meta(self.id)) > File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, > in get_task_meta > binding.declare() > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in > declare >self.exchange.declare(nowait) > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in > declare > nowait=nowait, passive=passive, > File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in > exchange_declare > self._send_method((40, 10), args) > File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, > in _send_method > self.channel_id, method_sig, args, content, > File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, > in write_method > write_frame(1, channel, payload) > File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in > write_frame > frame_type, channel, size, payload, 0xce, > File "/usr/lib64/python2.7/socket.py", line 224, in meth > return getattr(self._sock,name)(*args) > error: [Errno 104] Connection reset by peer > [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia! -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869793#comment-15869793 ] Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:52 PM: I think I found the culprit. The scheduler is not careful to rate-limit hearbeats to the executor, and if they happen too often, then RabbitMQ will close the connection. Here's a patch that fixes the exception for me: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 + +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,13 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +self.executor.heartbeat() +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} I still think the scheduler should survive {{ConnectionResetError}} exceptions from an executor hearbeat as they still could occur, but I'll leave the patch as-is to show the minimal change required. was (Author: erikcederstrand): I think I found the culprit. The scheduler is not careful to rate-limit hearbeats to the executor, and if they happen too often, then RabbitMQ will close the connection. Here's a patch that fixes the exception for me: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 + +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,13 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +self.executor.heartbeat() +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} I still think the scheduler should survive {{ConnectionClosed}} exceptions from an executor hearbeat as they still could occur, but I'll leave the patch as-is to show the minimal change required. > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 >Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset > by peer > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in > _execute > executor
[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869793#comment-15869793 ] Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:08 PM: I think I found the culprit. The scheduler is not careful to rate-limit hearbeats to the executor, and if they happen too often, then RabbitMQ will close the connection. Here's a patch that fixes the exception for me: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 + +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,13 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +self.executor.heartbeat() +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} I still think the scheduler should survive {{ConnectionClosed}} exceptions from an executor hearbeat as they still could occur, but I'll leave the patch as-is to thow the minimal change required. was (Author: erikcederstrand): I think I found the culprit. The scheduler is not careful to rate-limit hearbeats to the executor, and if they happen too often, then RabbitMQ will close the connection. Here's a patch that fixes the exception for me: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:41:10.0 + +++ jobs.py 2017-02-16 11:40:28.638116325 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,13 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +self.executor.heartbeat() +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} I still think the scheduler should survive {{ConnectionClosed}} exceptions from an executor hearbeat as they still could occur, but I'll leave the patch as-is to thow the minimal change required. > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 >Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset > by peer > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in > _execute > executor.heartbeat() > File "/usr/lib/python2.7/site-packag
[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869793#comment-15869793 ] Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 12:08 PM: I think I found the culprit. The scheduler is not careful to rate-limit hearbeats to the executor, and if they happen too often, then RabbitMQ will close the connection. Here's a patch that fixes the exception for me: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 + +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,13 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +self.executor.heartbeat() +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} I still think the scheduler should survive {{ConnectionClosed}} exceptions from an executor hearbeat as they still could occur, but I'll leave the patch as-is to show the minimal change required. was (Author: erikcederstrand): I think I found the culprit. The scheduler is not careful to rate-limit hearbeats to the executor, and if they happen too often, then RabbitMQ will close the connection. Here's a patch that fixes the exception for me: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py.orig 2017-02-16 11:58:55.057991344 + +++ /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:57:07.060060262 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,13 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +self.executor.heartbeat() +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} I still think the scheduler should survive {{ConnectionClosed}} exceptions from an executor hearbeat as they still could occur, but I'll leave the patch as-is to thow the minimal change required. > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 >Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset > by peer > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in > _execute > executor.hea
[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869793#comment-15869793 ] Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 11:47 AM: I think I found the culprit. The scheduler is not careful to rate-limit hearbeats to the executor, and if they happen too often, then RabbitMQ will close the connection. Here's a patch that fixes the exception for me: {code} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:41:10.0 + +++ jobs.py 2017-02-16 11:40:28.638116325 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,13 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +self.executor.heartbeat() +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {code} I still think the scheduler should survive {{ConnectionClosed}} exceptions from an executor hearbeat as they still could occur, but I'll leave the patch as-is to thow the minimal change required. was (Author: erikcederstrand): I think I found the culprit. The scheduler is not careful to rate-limit hearbeats to the executor, and if they happen too often, then RabbitMQ will close the connection. Here's a patch that fixes the exception for me: {{code}} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:41:10.0 + +++ jobs.py 2017-02-16 11:40:28.638116325 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,13 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +self.executor.heartbeat() +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {{code}} I still think the scheduler should survive {{ConnectionClosed}} exceptions from an executor hearbeat as they still could occur, but I'll leave the patch as-is to thow the minimal change required. > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 >Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset > by peer > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in > _execute > executor.heartbeat() > File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", > li
[jira] [Commented] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869793#comment-15869793 ] Erik Cederstrand commented on AIRFLOW-342: -- I think I found the culprit. The scheduler is not careful to rate-limit hearbeats to the executor, and if they happen too often, then RabbitMQ will close the connection. Here's a patch that fixes the exception for me: {{code}} --- /usr/local/lib/python3.4/dist-packages/airflow/jobs.py 2017-02-16 11:41:10.0 + +++ jobs.py 2017-02-16 11:40:28.638116325 + @@ -1371,6 +1371,8 @@ last_stat_print_time = datetime(2000, 1, 1) # Last time that self.heartbeat() was called. last_self_heartbeat_time = datetime.now() +# Last time that self.executor.heartbeat() was called. +last_executor_heartbeat_time = datetime.now() # Last time that the DAG dir was traversed to look for files last_dag_dir_refresh_time = datetime.now() @@ -1436,9 +1438,13 @@ self._execute_task_instances(simple_dag_bag, (State.SCHEDULED,)) -# Call hearbeats -self.logger.info("Heartbeating the executor") -self.executor.heartbeat() +# Heartbeat the executor periodically +time_since_last_heartbeat = (datetime.now() - + last_executor_heartbeat_time).total_seconds() +if time_since_last_heartbeat > self.heartrate: +self.logger.info("Heartbeating the executor") +self.executor.heartbeat() +last_executor_heartbeat_time = datetime.now() # Process events from the executor self._process_executor_events() {{code}} I still think the scheduler should survive {{ConnectionClosed}} exceptions from an executor hearbeat as they still could occur, but I'll leave the patch as-is to thow the minimal change required. > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 >Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset > by peer > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in > _execute > executor.heartbeat() > File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", > line 107, in heartbeat > self.sync() > File > "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line > 74, in sync > state = async.state > File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state > return self._get_task_meta()['status'] > File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in > _get_task_meta > return self._maybe_set_cache(self.backend.get_task_meta(self.id)) > File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, > in get_task_meta > binding.declare() > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in > declare >self.exchange.declare(nowait) > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in > declare > nowait=nowait, passive=passive, > File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in > exchange_declare > self._send_method((40, 10), args) > File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, > in _send_method > self.channel_id, method_sig, args, content, > File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, > in write_method > write_frame(1, channel, payload) > File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in > write_frame > frame_type, channel, size, payload, 0xce, > File "/usr/lib64/python2.7/socket.py", line 224, in meth > return getattr(self._sock,name)(*args) > error: [Errno 104] Connection reset by peer > [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia! -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869598#comment-15869598 ] Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 10:35 AM: This may be a Celery issue when the worker starts up when there are already messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the exact same stack trace in https://github.com/celery/celery/issues/3773 This was with celery 4.0.2. Downgrading to 3.1.23 did not help. Interestingly, "airflow scheduler" only crashes if there are tasks in "STARTED" state when it starts. If all tasks are "SUCCESS", the scheduler does not crash. was (Author: erikcederstrand): This may be a Celery issue when the worker starts up when there are already messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the exact same stack trace in https://github.com/celery/celery/issues/3773 This was with celery 4.0.2. Downgrading to 3.1.23 did not help. Interestingly, {airflow scheduler} only crashes if there are tasks in "STARTED" state when it starts. If all tasks are "SUCCESS", the scheduler does not crash. > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 >Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset > by peer > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in > _execute > executor.heartbeat() > File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", > line 107, in heartbeat > self.sync() > File > "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line > 74, in sync > state = async.state > File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state > return self._get_task_meta()['status'] > File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in > _get_task_meta > return self._maybe_set_cache(self.backend.get_task_meta(self.id)) > File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, > in get_task_meta > binding.declare() > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in > declare >self.exchange.declare(nowait) > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in > declare > nowait=nowait, passive=passive, > File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in > exchange_declare > self._send_method((40, 10), args) > File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, > in _send_method > self.channel_id, method_sig, args, content, > File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, > in write_method > write_frame(1, channel, payload) > File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in > write_frame > frame_type, channel, size, payload, 0xce, > File "/usr/lib64/python2.7/socket.py", line 224, in meth > return getattr(self._sock,name)(*args) > error: [Errno 104] Connection reset by peer > [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia! -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869598#comment-15869598 ] Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 10:35 AM: This may be a Celery issue when the worker starts up when there are already messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the exact same stack trace in https://github.com/celery/celery/issues/3773 This was with celery 4.0.2. Downgrading to 3.1.23 did not help. Interestingly, {airflow scheduler} only crashes if there are tasks in "STARTED" state when it starts. If all tasks are "SUCCESS", the scheduler does not crash. was (Author: erikcederstrand): This may be a Celery issue when the worker starts up when there are already messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the exact same stack trace in https://github.com/celery/celery/issues/3773 This was with celery 4.0.2. Downgrading to 3.1.23 did not help. > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 >Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset > by peer > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in > _execute > executor.heartbeat() > File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", > line 107, in heartbeat > self.sync() > File > "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line > 74, in sync > state = async.state > File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state > return self._get_task_meta()['status'] > File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in > _get_task_meta > return self._maybe_set_cache(self.backend.get_task_meta(self.id)) > File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, > in get_task_meta > binding.declare() > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in > declare >self.exchange.declare(nowait) > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in > declare > nowait=nowait, passive=passive, > File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in > exchange_declare > self._send_method((40, 10), args) > File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, > in _send_method > self.channel_id, method_sig, args, content, > File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, > in write_method > write_frame(1, channel, payload) > File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in > write_frame > frame_type, channel, size, payload, 0xce, > File "/usr/lib64/python2.7/socket.py", line 224, in meth > return getattr(self._sock,name)(*args) > error: [Errno 104] Connection reset by peer > [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia! -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869598#comment-15869598 ] Erik Cederstrand edited comment on AIRFLOW-342 at 2/16/17 9:40 AM: --- This may be a Celery issue when the worker starts up when there are already messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the exact same stack trace in https://github.com/celery/celery/issues/3773 This was with celery 4.0.2. Downgrading to 3.1.23 did not help. was (Author: erikcederstrand): This may be a Celery issue when the worker starts up when there are already messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the exact same stack trace in https://github.com/celery/celery/issues/3773 > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 >Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset > by peer > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in > _execute > executor.heartbeat() > File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", > line 107, in heartbeat > self.sync() > File > "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line > 74, in sync > state = async.state > File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state > return self._get_task_meta()['status'] > File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in > _get_task_meta > return self._maybe_set_cache(self.backend.get_task_meta(self.id)) > File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, > in get_task_meta > binding.declare() > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in > declare >self.exchange.declare(nowait) > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in > declare > nowait=nowait, passive=passive, > File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in > exchange_declare > self._send_method((40, 10), args) > File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, > in _send_method > self.channel_id, method_sig, args, content, > File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, > in write_method > write_frame(1, channel, payload) > File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in > write_frame > frame_type, channel, size, payload, 0xce, > File "/usr/lib64/python2.7/socket.py", line 224, in meth > return getattr(self._sock,name)(*args) > error: [Errno 104] Connection reset by peer > [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia! -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-342) exception in 'airflow scheduler' : Connection reset by peer
[ https://issues.apache.org/jira/browse/AIRFLOW-342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869598#comment-15869598 ] Erik Cederstrand commented on AIRFLOW-342: -- This may be a Celery issue when the worker starts up when there are already messages in RabbitMQ. See https://github.com/celery/celery/issues/3620 and the exact same stack trace in https://github.com/celery/celery/issues/3773 > exception in 'airflow scheduler' : Connection reset by peer > > > Key: AIRFLOW-342 > URL: https://issues.apache.org/jira/browse/AIRFLOW-342 > Project: Apache Airflow > Issue Type: Bug > Components: celery, scheduler >Affects Versions: Airflow 1.7.1.3 > Environment: OS: Red Hat Enterprise Linux Server 7.2 (Maipo) > Python: 2.7.5 > Airflow: 1.7.1.3 >Reporter: Hila Visan > > 'airflow scheduler' command throws an exception when running it. > Despite the exception, the workers run the tasks from the queues as expected. > Error details: > > [2016-06-30 19:00:10,130] {jobs.py:758} ERROR - [Errno 104] Connection reset > by peer > Traceback (most recent call last): > File "/usr/lib/python2.7/site-packages/airflow/jobs.py", line 755, in > _execute > executor.heartbeat() > File "/usr/lib/python2.7/site-packages/airflow/executors/base_executor.py", > line 107, in heartbeat > self.sync() > File > "/usr/lib/python2.7/site-packages/airflow/executors/celery_executor.py", line > 74, in sync > state = async.state > File "/usr/lib/python2.7/site-packages/celery/result.py", line 394, in state > return self._get_task_meta()['status'] > File "/usr/lib/python2.7/site-packages/celery/result.py", line 339, in > _get_task_meta > return self._maybe_set_cache(self.backend.get_task_meta(self.id)) > File "/usr/lib/python2.7/site-packages/celery/backends/amqp.py", line 163, > in get_task_meta > binding.declare() > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 521, in > declare >self.exchange.declare(nowait) > File "/usr/lib/python2.7/site-packages/kombu/entity.py", line 174, in > declare > nowait=nowait, passive=passive, > File "/usr/lib/python2.7/site-packages/amqp/channel.py", line 615, in > exchange_declare > self._send_method((40, 10), args) > File "/usr/lib/python2.7/site-packages/amqp/abstract_channel.py", line 56, > in _send_method > self.channel_id, method_sig, args, content, > File "/usr/lib/python2.7/site-packages/amqp/method_framing.py", line 221, > in write_method > write_frame(1, channel, payload) > File "/usr/lib/python2.7/site-packages/amqp/transport.py", line 182, in > write_frame > frame_type, channel, size, payload, 0xce, > File "/usr/lib64/python2.7/socket.py", line 224, in meth > return getattr(self._sock,name)(*args) > error: [Errno 104] Connection reset by peer > [2016-06-30 19:00:10,131] {jobs.py:759} ERROR - Tachycardia! -- This message was sent by Atlassian JIRA (v6.3.15#6346)