[ 
https://issues.apache.org/jira/browse/AIRFLOW-2844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sajid Sajid reassigned AIRFLOW-2844:
------------------------------------

    Assignee: Sajid Sajid

> Airflow Logs BrokenPipeException
> --------------------------------
>
>                 Key: AIRFLOW-2844
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2844
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: logging
>    Affects Versions: 1.9.0
>            Reporter: Kyle Bridenstine
>            Assignee: Sajid Sajid
>            Priority: Critical
>
> I'm using a clustered Airflow environment where I have four AWS ec2-instances 
> for the servers.
> *ec2-instances*
>  - Server 1: Webserver, Scheduler, Redis Queue, PostgreSQL Database
>  - Server 2: Webserver
>  - Server 3: Worker
>  - Server 4: Worker
> My setup has been working perfectly fine for three months now but 
> sporadically about once a week I get a Broken Pipe Exception when Airflow is 
> attempting to log something.
> {code:java}
> Log file isn't local.
> Fetching here: 
> http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-13T00:00:00/1.log
> [2018-07-16 00:00:15,521] {cli.py:374} INFO - Running on host ip-1-2-3-4
> [2018-07-16 00:00:15,698] {models.py:1197} INFO - Dependencies all met for 
> <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
> [2018-07-16 00:00:15,710] {models.py:1197} INFO - Dependencies all met for 
> <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
> [2018-07-16 00:00:15,710] {models.py:1407} INFO - 
> --------------------------------------------------------------------------------
> Starting attempt 1 of 1
> --------------------------------------------------------------------------------
> [2018-07-16 00:00:15,719] {models.py:1428} INFO - Executing 
> <Task(OmegaFileSensor): task_1> on 2018-07-13 00:00:00
> [2018-07-16 00:00:15,720] {base_task_runner.py:115} INFO - Running: ['bash', 
> '-c', 'airflow run foobar task_1 2018-07-13T00:00:00 --job_id 1320 --raw -sd 
> DAGS_FOLDER/datalake_digitalplatform_arl_workflow_schedule_test_2.py']
> [2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: 
> [2018-07-16 00:00:16,532] {configuration.py:206} WARNING - section/key 
> [celery/celery_ssl_active] not found in config
> [2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: 
> [2018-07-16 00:00:16,532] {default_celery.py:41} WARNING - Celery Executor 
> will run without SSL
> [2018-07-16 00:00:16,534] {base_task_runner.py:98} INFO - Subtask: 
> [2018-07-16 00:00:16,533] {_init_.py:45} INFO - Using executor CeleryExecutor
> [2018-07-16 00:00:16,597] {base_task_runner.py:98} INFO - Subtask: 
> [2018-07-16 00:00:16,597] {models.py:189} INFO - Filling up the DagBag from 
> /home/ec2-user/airflow/dags/datalake_digitalplatform_arl_workflow_schedule_test_2.py
> [2018-07-16 00:00:16,768] {cli.py:374} INFO - Running on host ip-1-2-3-4
> [2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - — Logging error —
> [2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/_init_.py", line 996, in emit
> self.flush()
> [2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/_init_.py", line 976, in flush
> self.stream.flush()
> [2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - Call stack:
> [2018-07-16 00:16:24,933] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 78, in execute
> while not self.poke(context):
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File 
> "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
> directory = os.listdir(full_path)
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 36, 
> in handle_timeout
> self.log.error("Process timed out")
> [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - Message: 'Process 
> timed out'
> Arguments: ()
> [2018-07-16 00:16:24,942] {models.py:1595} ERROR - Timeout
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, 
> in _run_raw_task
> result = task_copy.execute(context=context)
> File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", 
> line 78, in execute
> while not self.poke(context):
> File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
> directory = os.listdir(full_path)
> File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 
> 37, in handle_timeout
> raise AirflowTaskTimeout(self.error_message)
> airflow.exceptions.AirflowTaskTimeout: Timeout
> [2018-07-16 00:16:24,942] {models.py:1624} INFO - Marking task as FAILED.
> [2018-07-16 00:16:24,956] {models.py:1644} ERROR - Timeout
> {code}
> Sometimes the error will also say
> {code:java}
> Log file isn't local.
> Fetching here: 
> http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log
> Failed to fetch log file from worker. 404 Client Error: NOT FOUND for url: 
> http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log
> {code}
> I'm not sure why the logs are working ~95% of the time but are randomly 
> failing at other times. Here are my log settings in my Airflow.cfg file,
> {code:java}
> # The folder where airflow should store its log files
> # This path must be absolute
> base_log_folder = /home/ec2-user/airflow/logs
> # Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
> # must supply an Airflow connection id that provides access to the storage
> # location.
> remote_log_conn_id =
> encrypt_s3_logs = False
> # Logging level
> logging_level = INFO
> # Logging class
> # Specify the class that will specify the logging configuration
> # This class has to be on the python classpath
> # logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
> logging_config_class =
> # Log format
> log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - 
> %%(message)s
> simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
> # Name of handler to read task instance logs.
> # Default to use file task handler.
> task_log_reader = file.task
> # Log files for the gunicorn webserver. '-' means log to stderr.
> access_logfile = -
> error_logfile = 
> # The amount of time (in secs) webserver will wait for initial handshake
> # while fetching logs from other worker machine
> log_fetch_timeout_sec = 5
> # When you start an airflow worker, airflow starts a tiny web server
> # subprocess to serve the workers local log files to the airflow main
> # web server, who then builds pages and sends them to users. This defines
> # the port on which the logs are served. It needs to be unused, and open
> # visible from the main web server to connect into the workers.
> worker_log_server_port = 8793
> # How often should stats be printed to the logs
> print_stats_interval = 30
> child_process_log_directory = /home/ec2-user/airflow/logs/scheduler{code}
> I'm wondering if maybe I should try a different technique for my logging such 
> as writing to an S3 Bucket or if there is something else I can do to fix this 
> issue.
> *Update:*
> Writing the logs to S3 did not resolve this issue. Also, the error is more 
> consistent now (still sporadic). It's happening more like 50% of the time 
> now. One thing I noticed is that the task it's happening on is my AWS EMR 
> creation task. Starting an AWS EMR cluster takes about 20 minutes and then 
> the task has to wait for the Spark commands to run on the EMR cluster. So the 
> single task is running for about 30 minutes. I'm wondering if this is too 
> long for an Airflow task to be running and if that's why it starts to fail 
> writing to the logs. If this is the case then I could breakup the EMR task so 
> that there is one task for the EMR creation, then another task for the Spark 
> commands on the EMR cluster.
>  *Update 2:*
> Sometimes I see slightly different details in the output of the error such as 
> this
> {code:java}
> *** Log file isn't local.
> *** Fetching here: 
> http://ip-1-2-3-4:1234/log/dag_name/task_name/2018-07-30T00:00:00/9.log
> [2018-08-01 15:24:52,002] {cli.py:374} INFO - Running on host ip-1-2-3-4
> [2018-08-01 15:24:52,383] {models.py:1197} INFO - Dependencies all met for 
> <TaskInstance: dag_name.task_name 2018-07-30 00:00:00 [queued]>
> [2018-08-01 15:24:52,538] {models.py:1197} INFO - Dependencies all met for 
> <TaskInstance: dag_name.task_name 2018-07-30 00:00:00 [queued]>
> [2018-08-01 15:24:52,538] {models.py:1407} INFO - 
> --------------------------------------------------------------------------------
> Starting attempt 9 of 9
> --------------------------------------------------------------------------------
> [2018-08-01 15:24:52,618] {models.py:1428} INFO - Executing 
> <Task(OmegaFileSensor): task_name> on 2018-07-30 00:00:00
> [2018-08-01 15:24:52,619] {base_task_runner.py:115} INFO - Running: ['bash', 
> '-c', 'airflow run dag_name task_name 2018-07-30T00:00:00 --job_id 410 --raw 
> -sd DAGS_FOLDER/dag_name.py']
> [2018-08-01 15:25:00,650] {base_task_runner.py:98} INFO - Subtask: 
> [2018-08-01 15:25:00,649] {configuration.py:206} WARNING - section/key 
> [celery/celery_ssl_active] not found in config
> [2018-08-01 15:25:00,682] {base_task_runner.py:98} INFO - Subtask: 
> [2018-08-01 15:25:00,682] {default_celery.py:41} WARNING - Celery Executor 
> will run without SSL
> [2018-08-01 15:25:00,684] {base_task_runner.py:98} INFO - Subtask: 
> [2018-08-01 15:25:00,683] {__init__.py:45} INFO - Using executor 
> CeleryExecutor
> [2018-08-01 15:25:01,151] {base_task_runner.py:98} INFO - Subtask: 
> [2018-08-01 15:25:01,151] {models.py:189} INFO - Filling up the DagBag from 
> /var/lib/airflow/dags/dag_name.py
> [2018-08-01 15:25:03,909] {cli.py:374} INFO - Running on host ip-1-2-3-4
> [2018-08-02 13:13:47,130] {models.py:1470} ERROR - Killing subprocess
> [2018-08-02 13:13:47,560] {models.py:1595} ERROR - Task received SIGTERM 
> signal
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, 
> in _run_raw_task
> result = task_copy.execute(context=context)
> File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", 
> line 84, in execute
> sleep(self.poke_interval)
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, 
> in signal_handler
> raise AirflowException("Task received SIGTERM signal")
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:47,561] {models.py:1624} INFO - Marking task as FAILED.
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,640] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:47,640] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:47,640] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:47,640] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 70, in execute
> self.log.info("Tmp dir root location: \n %s", gettempdir())
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - Message: 'Tmp dir 
> root location: \n %s'
> Arguments: ('/tmp',)
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,719] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 80, in execute
> script_location
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - Message: 'Temporary 
> script location: %s'
> Arguments: 
> ('/tmp/airflowtmp81ztdhnm//tmp/airflowtmp81ztdhnm/failure_task_notificationlzixk4ar',)
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 88, in execute
> self.log.info("Running command: %s", bash_command)
> [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - Message: 'Running 
> command: %s'
> Arguments: ('aws sns publish --topic-arn 
> arn:aws:sns:us-east-1:464816863426:datalake_digital_platform_arl_airflow_workflow
>  --message "The ARL Airflow Workflow <TaskInstance: dag_name.task_name 
> 2018-07-30 00:00:00 [failed]> task has failed." --region us-east-1',)
> [2018-08-02 13:13:47,835] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:47,837] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:47,837] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:47,837] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 97, in execute
> self.log.info("Output:")
> [2018-08-02 13:13:47,837] {logging_mixin.py:84} WARNING - Message: 'Output:'
> Arguments: ()
> [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 101, in execute
> self.log.info(line)
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - Message: '{'
> Arguments: ()
> [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 101, in execute
> self.log.info(line)
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - Message: 
> '"MessageId": "7c284092-dc0c-539b-97b9-134fe219aacd"'
> Arguments: ()
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - --- Logging error 
> ---
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in 
> _run_raw_task
> result = task_copy.execute(context=context)
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 
> 84, in execute
> sleep(self.poke_interval)
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in 
> signal_handler
> raise AirflowException("Task received SIGTERM signal")
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - 
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - 
> During handling of the above exception, another exception occurred:
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - Traceback (most 
> recent call last):
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
> self.flush()
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
> self.stream.flush()
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - BrokenPipeError: 
> [Errno 32] Broken pipe
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - Call stack:
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/bin/airflow", line 27, in <module>
> args.func(args)
> [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
> pool=args.pool,
> [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in 
> wrapper
> result = func(*args, **kwargs)
> [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in 
> _run_raw_task
> self.handle_failure(e, test_mode, context)
> [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in 
> handle_failure
> task.on_failure_callback(context)
> [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File 
> "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 101, in execute
> self.log.info(line)
> [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - Message: '}'
> Arguments: ()
> [2018-08-02 13:13:58,639] {models.py:1470} ERROR - Killing subprocess
> [2018-08-02 13:13:58,640] {models.py:1638} ERROR - Failed at executing 
> callback
> [2018-08-02 13:13:58,640] {models.py:1639} ERROR - Task received SIGTERM 
> signal
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, 
> in _run_raw_task
> result = task_copy.execute(context=context)
> File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", 
> line 84, in execute
> sleep(self.poke_interval)
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, 
> in signal_handler
> raise AirflowException("Task received SIGTERM signal")
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> During handling of the above exception, another exception occurred:
> Traceback (most recent call last):
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, 
> in handle_failure
> task.on_failure_callback(context)
> File "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback
> return failure_task_notification.execute(context=context)
> File 
> "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", 
> line 99, in execute
> for line in iter(sp.stdout.readline, b''):
> File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, 
> in signal_handler
> raise AirflowException("Task received SIGTERM signal")
> airflow.exceptions.AirflowException: Task received SIGTERM signal
> [2018-08-02 13:13:59,126] {models.py:1644} ERROR - Task received SIGTERM 
> signal{code}
> *Note:*
> I've also created a Stackoverflow question for this issue here 
> [https://stackoverflow.com/questions/51365911/airflow-logs-brokenpipeexception]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to