[ 
https://issues.apache.org/jira/browse/AIRFLOW-2844?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576658#comment-16576658
 ] 

Kyle Bridenstine commented on AIRFLOW-2844:
-------------------------------------------

*Root Cause Of Issue:*
 * [https://github.com/apache/incubator-airflow/pull/2484] 

*Stackoverflow Posts That Helped Solve The Issue:*
 * 
[https://stackoverflow.com/questions/51365911/airflow-logs-brokenpipeexception/51790409#51790409]
 * 
[https://stackoverflow.com/questions/51775370/airflowexception-celery-command-failed-the-recorded-hostname-does-not-match-t]
 

*Summary:*

This issue is a symptom of another issue I just resolved here 
[AirflowException: Celery command failed - The recorded hostname does not match 
this instance's hostname][1].

I didn't see the _AirflowException: Celery command failed_ for a while because 
it showed up on the _airflow worker_ logs. It wasn't until I watched the 
airflow worker logs in real time that I saw when the error is thrown I also got 
the BrokenPipeException in my task.

It gets somewhat weirder though. I would only see the BrokenPipeException 
thrown if I did `print("something to log")` *and* the `AirflowException: Celery 
command failed...` error happened on the Worker node. When I changed all of my 
print statements to use `import logging ... logging.info("something to log")` 
then I would not see the BrokenPipeException *but* the task would still fail 
because of the `AirflowException: Celery command failed...` error. But had I 
not seen the BrokenPipeException being thrown in my Airflow task logs I 
wouldn't have known why the task was failing because once I eliminated the 
print statements I never saw any error in the Airflow task logs (only on the 
_$airflow worker_ logs)

So long story short there are a few take aways.
 # Don't do `print("something to log")` use Airflow's built in logging by 
importing logging and then using the logging class like `import logging` then 
`logging.info("something to log")`
 # If you're using an AWS EC2-Instance as your server for Airflow then you may 
be experiencing this issue: 
https://github.com/apache/incubator-airflow/pull/2484 a fix to this issue has 
already been integrated into Airflow Version 1.10 (I'm currently using Airflow 
Version 1.9). So upgrade your [Airflow version to 1.10][2]. You can also use 
[the command here][3] but running `pip install 
git+git://github.com/apache/incubator-airflow.git` gives me version 
`v2.0.0.dev0+incubating` which I'm not sure if that's a development version or 
what, but it appears to be higher than 1.10 so perhaps it'll work. Also, if you 
don't want to upgrade your Airflow version then you could follow the steps on 
[the github issue][4] to either manually update the file with the fix or fork 
Airflow and cherry pick the commit that fixes it.

*Sources:*
 [1]: 
https://stackoverflow.com/questions/51775370/airflowexception-celery-command-failed-the-recorded-hostname-does-not-match-t
 [2]: https://github.com/apache/incubator-airflow/releases
 [3]: https://stackoverflow.com/a/47540377/3299397
 [4]: https://github.com/apache/incubator-airflow/pull/2484

> Airflow Logs BrokenPipeException
> --------------------------------
>
>                 Key: AIRFLOW-2844
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2844
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: logging
>    Affects Versions: 1.9.0
>            Reporter: Kyle Bridenstine
>            Priority: Critical
>
> I'm using a clustered Airflow environment where I have four AWS ec2-instances 
> for the servers.
> *ec2-instances*
>  - Server 1: Webserver, Scheduler, Redis Queue, PostgreSQL Database
>  - Server 2: Webserver
>  - Server 3: Worker
>  - Server 4: Worker
> My setup has been working perfectly fine for three months now but 
> sporadically about once a week I get a Broken Pipe Exception when Airflow is 
> attempting to log something.
> {code:java}
> Log file isn't local.
> Fetching here: 
> http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-13T00:00:00/1.log
> [2018-07-16 00:00:15,521] {cli.py:374} INFO - Running on host ip-1-2-3-4
> [2018-07-16 00:00:15,698] {models.py:1197} INFO - Dependencies all met for 
> <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
> [2018-07-16 00:00:15,710] {models.py:1197} INFO - Dependencies all met for 
> <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
> [2018-07-16 00:00:15,710] {models.py:1407} INFO - 
> --------------------------------------------------------------------------------
> Starting attempt 1 of 1
> --------------------------------------------------------------------------------
> [2018-07-16 00:00:15,719] {models.py:1428} INFO - Executing 
> <Task(OmegaFileSensor): task_1> on 2018-07-13 00:00:00
> [2018-07-16 00:00:15,720] {base_task_runner.py:115} INFO - Running: ['bash', 
> '-c', 'airflow run foobar task_1 2018-07-13T00:00:00 --job_id 1320 --raw -sd 
> DAGS_FOLDER/datalake_digitalplatform_arl_workflow_schedule_test_2.py']
> [2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: 
> [2018-07-16 00:00:16,532] {configuration.py:206} WARNING - section/key 
> [celery/celery_ssl_active] not found in config
> [2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: 
> [2018-07-16 00:00:16,532] {default_celery.py:41} WARNING - Celery Executor 
> will run without SSL
> [2018-07-16 00:00:16,534] {base_task_runner.py:98} INFO - Subtask: 
> [2018-07-16 00:00:16,533] {_init_.py:45} INFO - Using executor CeleryExecutor
> [2018-07-16 00:00:16,597] {base_task_runner.py:98} INFO - Subtask: 
> [2018-07-16 00:00:16,597] {models.py:189} INFO - Filling up the DagBag from 
> /home/ec2-user/airflow/dags/datalake_digitalplatform_arl_workflow_schedule_test_2.py
> [2018-07-16 00:00:16,768] {cli.py:374} INFO - Running on host ip-1-2-3-4
> [2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - — Logging error —
> [2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/_init_.py", line 996, in emit
> self.flush()
> [2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/_init_.py", line 976, in flush
> self.stream.flush()
> [2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - Call stack:
> [2018-07-16 00:16:24,933] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 78, in execute
> while not self.poke(context):
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File 
> "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
> directory = os.listdir(full_path)
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 36, 
> in handle_timeout
> self.log.error("Process timed out")
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - Message: 'Process 
> timed out'
> Arguments: ()
> [2018-07-16 00:16:24,942] {models.py:1595} ERROR - Timeout
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, 
> in _run_raw_task
> result = task_copy.execute(context=context)
> File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", 
> line 78, in execute
> while not self.poke(context):
> File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
> directory = os.listdir(full_path)
> File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 
> 37, in handle_timeout
> raise AirflowTaskTimeout(self.error_message)
> airflow.exceptions.AirflowTaskTimeout: Timeout
> [2018-07-16 00:16:24,942] {models.py:1624} INFO - Marking task as FAILED.
> [2018-07-16 00:16:24,956] {models.py:1644} ERROR - Timeout
> {code}
> Sometimes the error will also say
> {code:java}
> Log file isn't local.
> Fetching here: 
> http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log
> Failed to fetch log file from worker. 404 Client Error: NOT FOUND for url: 
> http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log
> {code}
> I'm not sure why the logs are working ~95% of the time but are randomly 
> failing at other times. Here are my log settings in my Airflow.cfg file,
> {code:java}
> # The folder where airflow should store its log files
> # This path must be absolute
> base_log_folder = /home/ec2-user/airflow/logs
> # Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
> # must supply an Airflow connection id that provides access to the storage
> # location.
> remote_log_conn_id =
> encrypt_s3_logs = False
> # Logging level
> logging_level = INFO
> # Logging class
> # Specify the class that will specify the logging configuration
> # This class has to be on the python classpath
> # logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
> logging_config_class =
> # Log format
> log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - 
> %%(message)s
> simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
> # Name of handler to read task instance logs.
> # Default to use file task handler.
> task_log_reader = file.task
> # Log files for the gunicorn webserver. '-' means log to stderr.
> access_logfile = -
> error_logfile = 
> # The amount of time (in secs) webserver will wait for initial handshake
> # while fetching logs from other worker machine
> log_fetch_timeout_sec = 5
> # When you start an airflow worker, airflow starts a tiny web server
> # subprocess to serve the workers local log files to the airflow main
> # web server, who then builds pages and sends them to users. This defines
> # the port on which the logs are served. It needs to be unused, and open
> # visible from the main web server to connect into the workers.
> worker_log_server_port = 8793
> # How often should stats be printed to the logs
> print_stats_interval = 30
> child_process_log_directory = /home/ec2-user/airflow/logs/scheduler{code}
> I'm wondering if maybe I should try a different technique for my logging such 
> as writing to an S3 Bucket or if there is something else I can do to fix this 
> issue.
> *Update:*
> Writing the logs to S3 did not resolve this issue. Also, the error is more 
> consistent now (still sporadic). It's happening more like 50% of the time 
> now. One thing I noticed is that the task it's happening on is my AWS EMR 
> creation task. Starting an AWS EMR cluster takes about 20 minutes and then 
> the task has to wait for the Spark commands to run on the EMR cluster. So the 
> single task is running for about 30 minutes. I'm wondering if this is too 
> long for an Airflow task to be running and if that's why it starts to fail 
> writing to the logs. If this is the case then I could breakup the EMR task so 
> that there is one task for the EMR creation, then another task for the Spark 
> commands on the EMR cluster.
>  *Update 2:*
> Sometimes I see slightly different details in the output of the error such as 
> this
> {code:java}
> *** Log file isn't local.
> *** Fetching here: 
> http://ip-1-2-3-4:1234/log/dag_name/task_name/2018-07-30T00:00:00/9.log
> [2018-08-01 15:24:52,002] {cli.py:374} INFO - Running on host ip-1-2-3-4
> [2018-08-01 15:24:52,383] {models.py:1197} INFO - Dependencies all met for 
> <TaskInstance: dag_name.task_name 2018-07-30 00:00:00 [queued]>
> [2018-08-01 15:24:52,538] {models.py:1197} INFO - Dependencies all met for 
> <TaskInstance: dag_name.task_name 2018-07-30 00:00:00 [queued]>
> [2018-08-01 15:24:52,538] {models.py:1407} INFO - 
> --------------------------------------------------------------------------------
> Starting attempt 9 of 9
> --------------------------------------------------------------------------------
> [2018-08-01 15:24:52,618] {models.py:1428} INFO - Executing 
> <Task(OmegaFileSensor): task_name> on 2018-07-30 00:00:00
> [2018-08-01 15:24:52,619] {base_task_runner.py:115} INFO - Running: ['bash', 
> '-c', 'airflow run dag_name task_name 2018-07-30T00:00:00 --job_id 410 --raw 
> -sd DAGS_FOLDER/dag_name.py']
> [2018-08-01 15:25:00,650] {base_task_runner.py:98} INFO - Subtask: 
> [2018-08-01 15:25:00,649] {configuration.py:206} WARNING - section/key 
> [celery/celery_ssl_active] not found in config
> [2018-08-01 15:25:00,682] {base_task_runner.py:98} INFO - Subtask: 
> [2018-08-01 15:25:00,682] {default_celery.py:41} WARNING - Celery Executor 
> will run without SSL
> [2018-08-01 15:25:00,684] {base_task_runner.py:98} INFO - Subtask: 
> [2018-08-01 15:25:00,683] {__init__.py:45} INFO - Using executor 
> CeleryExecutor
> [2018-08-01 15:25:01,151] {base_task_runner.py:98} INFO - Subtask: 
> [2018-08-01 15:25:01,151] {models.py:189} INFO - Filling up the DagBag from 
> /var/lib/airflow/dags/dag_name.py
> [2018-08-01 15:25:03,909] {cli.py:374} INFO - Running on host ip-1-2-3-4
> [2018-08-02 13:13:47,130] {models.py:1470} ERROR - Killing subprocess
> [2018-08-02 13:13:47,560] {models.py:1595} ERROR - Task received SIGTERM 
> signal
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, 
> in _run_raw_task
> result = task_copy.execute(context=context)
> File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", 
> line 84, in execute
> sleep(self.poke_interval)
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, 
> in signal_handler
> raise AirflowException("Task received SIGTERM signal")
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:47,561] {models.py:1624} INFO - Marking task as FAILED.
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,640] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:47,640] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:47,640] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:47,640] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 70, in execute
> self.log.info("Tmp dir root location: \n %s", gettempdir())
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - Message: 'Tmp dir 
> root location: \n %s'
> Arguments: ('/tmp',)
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,719] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 80, in execute
> script_location
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - Message: 'Temporary 
> script location: %s'
> Arguments: 
> ('/tmp/airflowtmp81ztdhnm//tmp/airflowtmp81ztdhnm/failure_task_notificationlzixk4ar',)
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 88, in execute
> self.log.info("Running command: %s", bash_command)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - Message: 'Running 
> command: %s'
> Arguments: ('aws sns publish --topic-arn 
> arn:aws:sns:us-east-1:464816863426:datalake_digital_platform_arl_airflow_workflow
>  --message "The ARL Airflow Workflow <TaskInstance: dag_name.task_name 
> 2018-07-30 00:00:00 [failed]> task has failed." --region us-east-1',)
> [2018-08-02 13:13:47,835] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:47,837] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:47,837] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:47,837] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 97, in execute
> self.log.info("Output:")
> [2018-08-02 13:13:47,837] {logging_mixin.py:84} WARNING - Message: 'Output:'
> Arguments: ()
> [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 101, in execute
> self.log.info(line)
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - Message: '{'
> Arguments: ()
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 101, in execute
> self.log.info(line)
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - Message: 
> '"MessageId": "7c284092-dc0c-539b-97b9-134fe219aacd"'
> Arguments: ()
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 101, in execute
> self.log.info(line)
> [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - Message: '}'
> Arguments: ()
> [2018-08-02 13:13:58,639] {models.py:1470} ERROR - Killing subprocess
> [2018-08-02 13:13:58,640] {models.py:1638} ERROR - Failed at executing 
> callback
> [2018-08-02 13:13:58,640] {models.py:1639} ERROR - Task received SIGTERM 
> signal
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, 
> in _run_raw_task
> result = task_copy.execute(context=context)
> File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", 
> line 84, in execute
> sleep(self.poke_interval)
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, 
> in signal_handler
> raise AirflowException("Task received SIGTERM signal")
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, 
> in handle_failure
> task.on_failure_callback(context)
> File "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 99, in execute
> for line in iter(sp.stdout.readline, b''):
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, 
> in signal_handler
> raise AirflowException("Task received SIGTERM signal")
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:59,126] {models.py:1644} ERROR - Task received SIGTERM 
> signal{code}
> *Note:*
> I've also created a Stackoverflow question for this issue here 
> [https://stackoverflow.com/questions/51365911/airflow-logs-brokenpipeexception]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to