Repository: incubator-airflow Updated Branches: refs/heads/v1-10-test e561ea64c -> d80e4b663
[AIRFLOW-2363] Fix return type bug in TaskHandler Closes #3259 from yrqls21/kevin_yang_fix_s3_logging (cherry picked from commit 19b3901284b9f7a9f9a898dd1a1e823e5109cfa1) Signed-off-by: Fokko Driesprong <fokkodriespr...@godatadriven.com> Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d80e4b66 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d80e4b66 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d80e4b66 Branch: refs/heads/v1-10-test Commit: d80e4b6637404851d20ab8bb44fc079c46cb8a28 Parents: e561ea6 Author: Kevin Yang <kevin.y...@airbnb.com> Authored: Mon Apr 30 12:49:06 2018 +0200 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Mon Apr 30 12:49:40 2018 +0200 ---------------------------------------------------------------------- airflow/bin/cli.py | 1 + airflow/utils/log/gcs_task_handler.py | 11 ++++++----- airflow/utils/log/s3_task_handler.py | 9 ++++----- airflow/utils/log/wasb_task_handler.py | 9 ++++----- tests/utils/log/test_s3_task_handler.py | 12 ++++++++++-- 5 files changed, 25 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d80e4b66/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 8a92cfa..f26cbe4 100644 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -468,6 +468,7 @@ def run(args, dag=None): else: with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, logging.WARN): _run(args, dag, ti) + logging.shutdown() @cli_utils.action_logging http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d80e4b66/airflow/utils/log/gcs_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/gcs_task_handler.py b/airflow/utils/log/gcs_task_handler.py index d4a9871..8c34792 100644 --- a/airflow/utils/log/gcs_task_handler.py +++ b/airflow/utils/log/gcs_task_handler.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -113,13 +113,14 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin): remote_log = self.gcs_read(remote_loc) log = '*** Reading remote log from {}.\n{}\n'.format( remote_loc, remote_log) + return log, {'end_of_log': True} except Exception as e: log = '*** Unable to read remote log from {}\n*** {}\n\n'.format( remote_loc, str(e)) self.log.error(log) - log += super(GCSTaskHandler, self)._read(ti, try_number) - - return log, {'end_of_log': True} + local_log, metadata = super(GCSTaskHandler, self)._read(ti, try_number) + log += local_log + return log, metadata def gcs_read(self, remote_log_location): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d80e4b66/airflow/utils/log/s3_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/s3_task_handler.py b/airflow/utils/log/s3_task_handler.py index f29a92f..07b9b3e 100644 --- a/airflow/utils/log/s3_task_handler.py +++ b/airflow/utils/log/s3_task_handler.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -111,10 +111,9 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin): remote_log = self.s3_read(remote_loc, return_error=True) log = '*** Reading remote log from {}.\n{}\n'.format( remote_loc, remote_log) + return log, {'end_of_log': True} else: - log = super(S3TaskHandler, self)._read(ti, try_number) - - return log, {'end_of_log': True} + return super(S3TaskHandler, self)._read(ti, try_number) def s3_log_exists(self, remote_log_location): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d80e4b66/airflow/utils/log/wasb_task_handler.py ---------------------------------------------------------------------- diff --git a/airflow/utils/log/wasb_task_handler.py b/airflow/utils/log/wasb_task_handler.py index 1a9590d..a2a0c0d 100644 --- a/airflow/utils/log/wasb_task_handler.py +++ b/airflow/utils/log/wasb_task_handler.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -119,10 +119,9 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin): remote_log = self.wasb_read(remote_loc, return_error=True) log = '*** Reading remote log from {}.\n{}\n'.format( remote_loc, remote_log) + return log, {'end_of_log': True} else: - log = super(WasbTaskHandler, self)._read(ti, try_number) - - return log, {'end_of_log': True} + return super(WasbTaskHandler, self)._read(ti, try_number) def wasb_log_exists(self, remote_log_location): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d80e4b66/tests/utils/log/test_s3_task_handler.py ---------------------------------------------------------------------- diff --git a/tests/utils/log/test_s3_task_handler.py b/tests/utils/log/test_s3_task_handler.py index c287fbc..a5d5f15 100644 --- a/tests/utils/log/test_s3_task_handler.py +++ b/tests/utils/log/test_s3_task_handler.py @@ -7,9 +7,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -117,6 +117,14 @@ class TestS3TaskHandler(unittest.TestCase): 'Log line\n\n'], [{'end_of_log': True}]) ) + def test_read_when_s3_log_missing(self): + log, metadata = self.s3_task_handler.read(self.ti) + + self.assertEqual(1, len(log)) + self.assertEqual(len(log), len(metadata)) + self.assertIn('*** Log file does not exist:', log[0]) + self.assertEqual({'end_of_log': True}, metadata[0]) + def test_read_raises_return_error(self): handler = self.s3_task_handler url = 's3://nonexistentbucket/foo'