Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-10-test e561ea64c -> d80e4b663


[AIRFLOW-2363] Fix return type bug in TaskHandler

Closes #3259 from
yrqls21/kevin_yang_fix_s3_logging

(cherry picked from commit 19b3901284b9f7a9f9a898dd1a1e823e5109cfa1)
Signed-off-by: Fokko Driesprong <fokkodriespr...@godatadriven.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d80e4b66
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d80e4b66
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d80e4b66

Branch: refs/heads/v1-10-test
Commit: d80e4b6637404851d20ab8bb44fc079c46cb8a28
Parents: e561ea6
Author: Kevin Yang <kevin.y...@airbnb.com>
Authored: Mon Apr 30 12:49:06 2018 +0200
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Mon Apr 30 12:49:40 2018 +0200

----------------------------------------------------------------------
 airflow/bin/cli.py                      |  1 +
 airflow/utils/log/gcs_task_handler.py   | 11 ++++++-----
 airflow/utils/log/s3_task_handler.py    |  9 ++++-----
 airflow/utils/log/wasb_task_handler.py  |  9 ++++-----
 tests/utils/log/test_s3_task_handler.py | 12 ++++++++++--
 5 files changed, 25 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d80e4b66/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 8a92cfa..f26cbe4 100644
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -468,6 +468,7 @@ def run(args, dag=None):
     else:
         with redirect_stdout(ti.log, logging.INFO), redirect_stderr(ti.log, 
logging.WARN):
             _run(args, dag, ti)
+    logging.shutdown()
 
 
 @cli_utils.action_logging

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d80e4b66/airflow/utils/log/gcs_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/gcs_task_handler.py 
b/airflow/utils/log/gcs_task_handler.py
index d4a9871..8c34792 100644
--- a/airflow/utils/log/gcs_task_handler.py
+++ b/airflow/utils/log/gcs_task_handler.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -113,13 +113,14 @@ class GCSTaskHandler(FileTaskHandler, LoggingMixin):
             remote_log = self.gcs_read(remote_loc)
             log = '*** Reading remote log from {}.\n{}\n'.format(
                 remote_loc, remote_log)
+            return log, {'end_of_log': True}
         except Exception as e:
             log = '*** Unable to read remote log from {}\n*** {}\n\n'.format(
                 remote_loc, str(e))
             self.log.error(log)
-            log += super(GCSTaskHandler, self)._read(ti, try_number)
-
-        return log, {'end_of_log': True}
+            local_log, metadata = super(GCSTaskHandler, self)._read(ti, 
try_number)
+            log += local_log
+            return log, metadata
 
     def gcs_read(self, remote_log_location):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d80e4b66/airflow/utils/log/s3_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/s3_task_handler.py 
b/airflow/utils/log/s3_task_handler.py
index f29a92f..07b9b3e 100644
--- a/airflow/utils/log/s3_task_handler.py
+++ b/airflow/utils/log/s3_task_handler.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -111,10 +111,9 @@ class S3TaskHandler(FileTaskHandler, LoggingMixin):
             remote_log = self.s3_read(remote_loc, return_error=True)
             log = '*** Reading remote log from {}.\n{}\n'.format(
                 remote_loc, remote_log)
+            return log, {'end_of_log': True}
         else:
-            log = super(S3TaskHandler, self)._read(ti, try_number)
-
-        return log, {'end_of_log': True}
+            return super(S3TaskHandler, self)._read(ti, try_number)
 
     def s3_log_exists(self, remote_log_location):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d80e4b66/airflow/utils/log/wasb_task_handler.py
----------------------------------------------------------------------
diff --git a/airflow/utils/log/wasb_task_handler.py 
b/airflow/utils/log/wasb_task_handler.py
index 1a9590d..a2a0c0d 100644
--- a/airflow/utils/log/wasb_task_handler.py
+++ b/airflow/utils/log/wasb_task_handler.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -119,10 +119,9 @@ class WasbTaskHandler(FileTaskHandler, LoggingMixin):
             remote_log = self.wasb_read(remote_loc, return_error=True)
             log = '*** Reading remote log from {}.\n{}\n'.format(
                 remote_loc, remote_log)
+            return log, {'end_of_log': True}
         else:
-            log = super(WasbTaskHandler, self)._read(ti, try_number)
-
-        return log, {'end_of_log': True}
+            return super(WasbTaskHandler, self)._read(ti, try_number)
 
     def wasb_log_exists(self, remote_log_location):
         """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d80e4b66/tests/utils/log/test_s3_task_handler.py
----------------------------------------------------------------------
diff --git a/tests/utils/log/test_s3_task_handler.py 
b/tests/utils/log/test_s3_task_handler.py
index c287fbc..a5d5f15 100644
--- a/tests/utils/log/test_s3_task_handler.py
+++ b/tests/utils/log/test_s3_task_handler.py
@@ -7,9 +7,9 @@
 # to you under the Apache License, Version 2.0 (the
 # "License"); you may not use this file except in compliance
 # with the License.  You may obtain a copy of the License at
-# 
+#
 #   http://www.apache.org/licenses/LICENSE-2.0
-# 
+#
 # Unless required by applicable law or agreed to in writing,
 # software distributed under the License is distributed on an
 # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -117,6 +117,14 @@ class TestS3TaskHandler(unittest.TestCase):
              'Log line\n\n'], [{'end_of_log': True}])
         )
 
+    def test_read_when_s3_log_missing(self):
+        log, metadata = self.s3_task_handler.read(self.ti)
+
+        self.assertEqual(1, len(log))
+        self.assertEqual(len(log), len(metadata))
+        self.assertIn('*** Log file does not exist:', log[0])
+        self.assertEqual({'end_of_log': True}, metadata[0])
+
     def test_read_raises_return_error(self):
         handler = self.s3_task_handler
         url = 's3://nonexistentbucket/foo'

Reply via email to