[ https://issues.apache.org/jira/browse/AIRFLOW-2844?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Sajid Sajid reassigned AIRFLOW-2844: ------------------------------------ Assignee: Sajid Sajid > Airflow Logs BrokenPipeException > -------------------------------- > > Key: AIRFLOW-2844 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2844 > Project: Apache Airflow > Issue Type: Bug > Components: logging > Affects Versions: 1.9.0 > Reporter: Kyle Bridenstine > Assignee: Sajid Sajid > Priority: Critical > > I'm using a clustered Airflow environment where I have four AWS ec2-instances > for the servers. > *ec2-instances* > - Server 1: Webserver, Scheduler, Redis Queue, PostgreSQL Database > - Server 2: Webserver > - Server 3: Worker > - Server 4: Worker > My setup has been working perfectly fine for three months now but > sporadically about once a week I get a Broken Pipe Exception when Airflow is > attempting to log something. > {code:java} > Log file isn't local. > Fetching here: > http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-13T00:00:00/1.log > [2018-07-16 00:00:15,521] {cli.py:374} INFO - Running on host ip-1-2-3-4 > [2018-07-16 00:00:15,698] {models.py:1197} INFO - Dependencies all met for > <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]> > [2018-07-16 00:00:15,710] {models.py:1197} INFO - Dependencies all met for > <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]> > [2018-07-16 00:00:15,710] {models.py:1407} INFO - > -------------------------------------------------------------------------------- > Starting attempt 1 of 1 > -------------------------------------------------------------------------------- > [2018-07-16 00:00:15,719] {models.py:1428} INFO - Executing > <Task(OmegaFileSensor): task_1> on 2018-07-13 00:00:00 > [2018-07-16 00:00:15,720] {base_task_runner.py:115} INFO - Running: ['bash', > '-c', 'airflow run foobar task_1 2018-07-13T00:00:00 --job_id 1320 --raw -sd > DAGS_FOLDER/datalake_digitalplatform_arl_workflow_schedule_test_2.py'] > [2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: > [2018-07-16 00:00:16,532] {configuration.py:206} WARNING - section/key > [celery/celery_ssl_active] not found in config > [2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: > [2018-07-16 00:00:16,532] {default_celery.py:41} WARNING - Celery Executor > will run without SSL > [2018-07-16 00:00:16,534] {base_task_runner.py:98} INFO - Subtask: > [2018-07-16 00:00:16,533] {_init_.py:45} INFO - Using executor CeleryExecutor > [2018-07-16 00:00:16,597] {base_task_runner.py:98} INFO - Subtask: > [2018-07-16 00:00:16,597] {models.py:189} INFO - Filling up the DagBag from > /home/ec2-user/airflow/dags/datalake_digitalplatform_arl_workflow_schedule_test_2.py > [2018-07-16 00:00:16,768] {cli.py:374} INFO - Running on host ip-1-2-3-4 > [2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - — Logging error — > [2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/_init_.py", line 996, in emit > self.flush() > [2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/_init_.py", line 976, in flush > self.stream.flush() > [2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - BrokenPipeError: > [Errno 32] Broken pipe > [2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - Call stack: > [2018-07-16 00:16:24,933] {logging_mixin.py:84} WARNING - File > "/usr/bin/airflow", line 27, in <module> > args.func(args) > [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run > pool=args.pool, > [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in > wrapper > result = func(*args, **kwargs) > [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in > _run_raw_task > result = task_copy.execute(context=context) > [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line > 78, in execute > while not self.poke(context): > [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File > "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke > directory = os.listdir(full_path) > [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 36, > in handle_timeout > self.log.error("Process timed out") > [2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - Message: 'Process > timed out' > Arguments: () > [2018-07-16 00:16:24,942] {models.py:1595} ERROR - Timeout > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, > in _run_raw_task > result = task_copy.execute(context=context) > File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", > line 78, in execute > while not self.poke(context): > File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke > directory = os.listdir(full_path) > File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line > 37, in handle_timeout > raise AirflowTaskTimeout(self.error_message) > airflow.exceptions.AirflowTaskTimeout: Timeout > [2018-07-16 00:16:24,942] {models.py:1624} INFO - Marking task as FAILED. > [2018-07-16 00:16:24,956] {models.py:1644} ERROR - Timeout > {code} > Sometimes the error will also say > {code:java} > Log file isn't local. > Fetching here: > http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log > Failed to fetch log file from worker. 404 Client Error: NOT FOUND for url: > http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log > {code} > I'm not sure why the logs are working ~95% of the time but are randomly > failing at other times. Here are my log settings in my Airflow.cfg file, > {code:java} > # The folder where airflow should store its log files > # This path must be absolute > base_log_folder = /home/ec2-user/airflow/logs > # Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users > # must supply an Airflow connection id that provides access to the storage > # location. > remote_log_conn_id = > encrypt_s3_logs = False > # Logging level > logging_level = INFO > # Logging class > # Specify the class that will specify the logging configuration > # This class has to be on the python classpath > # logging_config_class = my.path.default_local_settings.LOGGING_CONFIG > logging_config_class = > # Log format > log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - > %%(message)s > simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s > # Name of handler to read task instance logs. > # Default to use file task handler. > task_log_reader = file.task > # Log files for the gunicorn webserver. '-' means log to stderr. > access_logfile = - > error_logfile = > # The amount of time (in secs) webserver will wait for initial handshake > # while fetching logs from other worker machine > log_fetch_timeout_sec = 5 > # When you start an airflow worker, airflow starts a tiny web server > # subprocess to serve the workers local log files to the airflow main > # web server, who then builds pages and sends them to users. This defines > # the port on which the logs are served. It needs to be unused, and open > # visible from the main web server to connect into the workers. > worker_log_server_port = 8793 > # How often should stats be printed to the logs > print_stats_interval = 30 > child_process_log_directory = /home/ec2-user/airflow/logs/scheduler{code} > I'm wondering if maybe I should try a different technique for my logging such > as writing to an S3 Bucket or if there is something else I can do to fix this > issue. > *Update:* > Writing the logs to S3 did not resolve this issue. Also, the error is more > consistent now (still sporadic). It's happening more like 50% of the time > now. One thing I noticed is that the task it's happening on is my AWS EMR > creation task. Starting an AWS EMR cluster takes about 20 minutes and then > the task has to wait for the Spark commands to run on the EMR cluster. So the > single task is running for about 30 minutes. I'm wondering if this is too > long for an Airflow task to be running and if that's why it starts to fail > writing to the logs. If this is the case then I could breakup the EMR task so > that there is one task for the EMR creation, then another task for the Spark > commands on the EMR cluster. > *Update 2:* > Sometimes I see slightly different details in the output of the error such as > this > {code:java} > *** Log file isn't local. > *** Fetching here: > http://ip-1-2-3-4:1234/log/dag_name/task_name/2018-07-30T00:00:00/9.log > [2018-08-01 15:24:52,002] {cli.py:374} INFO - Running on host ip-1-2-3-4 > [2018-08-01 15:24:52,383] {models.py:1197} INFO - Dependencies all met for > <TaskInstance: dag_name.task_name 2018-07-30 00:00:00 [queued]> > [2018-08-01 15:24:52,538] {models.py:1197} INFO - Dependencies all met for > <TaskInstance: dag_name.task_name 2018-07-30 00:00:00 [queued]> > [2018-08-01 15:24:52,538] {models.py:1407} INFO - > -------------------------------------------------------------------------------- > Starting attempt 9 of 9 > -------------------------------------------------------------------------------- > [2018-08-01 15:24:52,618] {models.py:1428} INFO - Executing > <Task(OmegaFileSensor): task_name> on 2018-07-30 00:00:00 > [2018-08-01 15:24:52,619] {base_task_runner.py:115} INFO - Running: ['bash', > '-c', 'airflow run dag_name task_name 2018-07-30T00:00:00 --job_id 410 --raw > -sd DAGS_FOLDER/dag_name.py'] > [2018-08-01 15:25:00,650] {base_task_runner.py:98} INFO - Subtask: > [2018-08-01 15:25:00,649] {configuration.py:206} WARNING - section/key > [celery/celery_ssl_active] not found in config > [2018-08-01 15:25:00,682] {base_task_runner.py:98} INFO - Subtask: > [2018-08-01 15:25:00,682] {default_celery.py:41} WARNING - Celery Executor > will run without SSL > [2018-08-01 15:25:00,684] {base_task_runner.py:98} INFO - Subtask: > [2018-08-01 15:25:00,683] {__init__.py:45} INFO - Using executor > CeleryExecutor > [2018-08-01 15:25:01,151] {base_task_runner.py:98} INFO - Subtask: > [2018-08-01 15:25:01,151] {models.py:189} INFO - Filling up the DagBag from > /var/lib/airflow/dags/dag_name.py > [2018-08-01 15:25:03,909] {cli.py:374} INFO - Running on host ip-1-2-3-4 > [2018-08-02 13:13:47,130] {models.py:1470} ERROR - Killing subprocess > [2018-08-02 13:13:47,560] {models.py:1595} ERROR - Task received SIGTERM > signal > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, > in _run_raw_task > result = task_copy.execute(context=context) > File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", > line 84, in execute > sleep(self.poke_interval) > File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, > in signal_handler > raise AirflowException("Task received SIGTERM signal") > airflow.exceptions.AirflowException: Task received SIGTERM signal > [2018-08-02 13:13:47,561] {models.py:1624} INFO - Marking task as FAILED. > [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - --- Logging error > --- > [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in > _run_raw_task > result = task_copy.execute(context=context) > [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line > 84, in execute > sleep(self.poke_interval) > [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in > signal_handler > raise AirflowException("Task received SIGTERM signal") > [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - > airflow.exceptions.AirflowException: Task received SIGTERM signal > [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - > During handling of the above exception, another exception occurred: > [2018-08-02 13:13:47,639] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:47,640] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit > self.flush() > [2018-08-02 13:13:47,640] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush > self.stream.flush() > [2018-08-02 13:13:47,640] {logging_mixin.py:84} WARNING - BrokenPipeError: > [Errno 32] Broken pipe > [2018-08-02 13:13:47,640] {logging_mixin.py:84} WARNING - Call stack: > [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File > "/usr/bin/airflow", line 27, in <module> > args.func(args) > [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run > pool=args.pool, > [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in > wrapper > result = func(*args, **kwargs) > [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in > _run_raw_task > self.handle_failure(e, test_mode, context) > [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in > handle_failure > task.on_failure_callback(context) > [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File > "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback > return failure_task_notification.execute(context=context) > [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", > line 70, in execute > self.log.info("Tmp dir root location: \n %s", gettempdir()) > [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - Message: 'Tmp dir > root location: \n %s' > Arguments: ('/tmp',) > [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - --- Logging error > --- > [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:47,641] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in > _run_raw_task > result = task_copy.execute(context=context) > [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line > 84, in execute > sleep(self.poke_interval) > [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in > signal_handler > raise AirflowException("Task received SIGTERM signal") > [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - > airflow.exceptions.AirflowException: Task received SIGTERM signal > [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - > During handling of the above exception, another exception occurred: > [2018-08-02 13:13:47,642] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:47,719] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit > self.flush() > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush > self.stream.flush() > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - BrokenPipeError: > [Errno 32] Broken pipe > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - Call stack: > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File > "/usr/bin/airflow", line 27, in <module> > args.func(args) > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run > pool=args.pool, > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in > wrapper > result = func(*args, **kwargs) > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in > _run_raw_task > self.handle_failure(e, test_mode, context) > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in > handle_failure > task.on_failure_callback(context) > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File > "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback > return failure_task_notification.execute(context=context) > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", > line 80, in execute > script_location > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - Message: 'Temporary > script location: %s' > Arguments: > ('/tmp/airflowtmp81ztdhnm//tmp/airflowtmp81ztdhnm/failure_task_notificationlzixk4ar',) > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - --- Logging error > --- > [2018-08-02 13:13:47,720] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in > _run_raw_task > result = task_copy.execute(context=context) > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line > 84, in execute > sleep(self.poke_interval) > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in > signal_handler > raise AirflowException("Task received SIGTERM signal") > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - > airflow.exceptions.AirflowException: Task received SIGTERM signal > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - > During handling of the above exception, another exception occurred: > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit > self.flush() > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush > self.stream.flush() > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - BrokenPipeError: > [Errno 32] Broken pipe > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - Call stack: > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File > "/usr/bin/airflow", line 27, in <module> > args.func(args) > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run > pool=args.pool, > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in > wrapper > result = func(*args, **kwargs) > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in > _run_raw_task > self.handle_failure(e, test_mode, context) > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in > handle_failure > task.on_failure_callback(context) > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File > "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback > return failure_task_notification.execute(context=context) > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", > line 88, in execute > self.log.info("Running command: %s", bash_command) > [2018-08-02 13:13:47,721] {logging_mixin.py:84} WARNING - Message: 'Running > command: %s' > Arguments: ('aws sns publish --topic-arn > arn:aws:sns:us-east-1:464816863426:datalake_digital_platform_arl_airflow_workflow > --message "The ARL Airflow Workflow <TaskInstance: dag_name.task_name > 2018-07-30 00:00:00 [failed]> task has failed." --region us-east-1',) > [2018-08-02 13:13:47,835] {logging_mixin.py:84} WARNING - --- Logging error > --- > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in > _run_raw_task > result = task_copy.execute(context=context) > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line > 84, in execute > sleep(self.poke_interval) > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in > signal_handler > raise AirflowException("Task received SIGTERM signal") > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - > airflow.exceptions.AirflowException: Task received SIGTERM signal > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - > During handling of the above exception, another exception occurred: > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit > self.flush() > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush > self.stream.flush() > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - BrokenPipeError: > [Errno 32] Broken pipe > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - Call stack: > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File > "/usr/bin/airflow", line 27, in <module> > args.func(args) > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run > pool=args.pool, > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in > wrapper > result = func(*args, **kwargs) > [2018-08-02 13:13:47,836] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in > _run_raw_task > self.handle_failure(e, test_mode, context) > [2018-08-02 13:13:47,837] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in > handle_failure > task.on_failure_callback(context) > [2018-08-02 13:13:47,837] {logging_mixin.py:84} WARNING - File > "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback > return failure_task_notification.execute(context=context) > [2018-08-02 13:13:47,837] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", > line 97, in execute > self.log.info("Output:") > [2018-08-02 13:13:47,837] {logging_mixin.py:84} WARNING - Message: 'Output:' > Arguments: () > [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - --- Logging error > --- > [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in > _run_raw_task > result = task_copy.execute(context=context) > [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line > 84, in execute > sleep(self.poke_interval) > [2018-08-02 13:13:57,731] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in > signal_handler > raise AirflowException("Task received SIGTERM signal") > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - > airflow.exceptions.AirflowException: Task received SIGTERM signal > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - > During handling of the above exception, another exception occurred: > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit > self.flush() > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush > self.stream.flush() > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - BrokenPipeError: > [Errno 32] Broken pipe > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - Call stack: > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File > "/usr/bin/airflow", line 27, in <module> > args.func(args) > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run > pool=args.pool, > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in > wrapper > result = func(*args, **kwargs) > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in > _run_raw_task > self.handle_failure(e, test_mode, context) > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in > handle_failure > task.on_failure_callback(context) > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File > "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback > return failure_task_notification.execute(context=context) > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", > line 101, in execute > self.log.info(line) > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - Message: '{' > Arguments: () > [2018-08-02 13:13:57,732] {logging_mixin.py:84} WARNING - --- Logging error > --- > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in > _run_raw_task > result = task_copy.execute(context=context) > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line > 84, in execute > sleep(self.poke_interval) > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in > signal_handler > raise AirflowException("Task received SIGTERM signal") > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - > airflow.exceptions.AirflowException: Task received SIGTERM signal > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - > During handling of the above exception, another exception occurred: > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit > self.flush() > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush > self.stream.flush() > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - BrokenPipeError: > [Errno 32] Broken pipe > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - Call stack: > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File > "/usr/bin/airflow", line 27, in <module> > args.func(args) > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run > pool=args.pool, > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in > wrapper > result = func(*args, **kwargs) > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in > _run_raw_task > self.handle_failure(e, test_mode, context) > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in > handle_failure > task.on_failure_callback(context) > [2018-08-02 13:13:57,733] {logging_mixin.py:84} WARNING - File > "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback > return failure_task_notification.execute(context=context) > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", > line 101, in execute > self.log.info(line) > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - Message: > '"MessageId": "7c284092-dc0c-539b-97b9-134fe219aacd"' > Arguments: () > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - --- Logging error > --- > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in > _run_raw_task > result = task_copy.execute(context=context) > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line > 84, in execute > sleep(self.poke_interval) > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, in > signal_handler > raise AirflowException("Task received SIGTERM signal") > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - > airflow.exceptions.AirflowException: Task received SIGTERM signal > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - > During handling of the above exception, another exception occurred: > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - Traceback (most > recent call last): > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit > self.flush() > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File > "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush > self.stream.flush() > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - BrokenPipeError: > [Errno 32] Broken pipe > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - Call stack: > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File > "/usr/bin/airflow", line 27, in <module> > args.func(args) > [2018-08-02 13:13:57,734] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run > pool=args.pool, > [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in > wrapper > result = func(*args, **kwargs) > [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1530, in > _run_raw_task > self.handle_failure(e, test_mode, context) > [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, in > handle_failure > task.on_failure_callback(context) > [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File > "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback > return failure_task_notification.execute(context=context) > [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - File > "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", > line 101, in execute > self.log.info(line) > [2018-08-02 13:13:57,735] {logging_mixin.py:84} WARNING - Message: '}' > Arguments: () > [2018-08-02 13:13:58,639] {models.py:1470} ERROR - Killing subprocess > [2018-08-02 13:13:58,640] {models.py:1638} ERROR - Failed at executing > callback > [2018-08-02 13:13:58,640] {models.py:1639} ERROR - Task received SIGTERM > signal > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, > in _run_raw_task > result = task_copy.execute(context=context) > File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", > line 84, in execute > sleep(self.poke_interval) > File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, > in signal_handler > raise AirflowException("Task received SIGTERM signal") > airflow.exceptions.AirflowException: Task received SIGTERM signal > During handling of the above exception, another exception occurred: > Traceback (most recent call last): > File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1636, > in handle_failure > task.on_failure_callback(context) > File "/var/lib/airflow/dags/dag_name.py", line 36, in on_failure_callback > return failure_task_notification.execute(context=context) > File > "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", > line 99, in execute > for line in iter(sp.stdout.readline, b''): > File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1472, > in signal_handler > raise AirflowException("Task received SIGTERM signal") > airflow.exceptions.AirflowException: Task received SIGTERM signal > [2018-08-02 13:13:59,126] {models.py:1644} ERROR - Task received SIGTERM > signal{code} > *Note:* > I've also created a Stackoverflow question for this issue here > [https://stackoverflow.com/questions/51365911/airflow-logs-brokenpipeexception] -- This message was sent by Atlassian Jira (v8.20.1#820001)