[ 
https://issues.apache.org/jira/browse/AIRFLOW-2825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16564241#comment-16564241
 ] 

ASF GitHub Bot commented on AIRFLOW-2825:
-----------------------------------------

Fokko closed pull request #3665: [AIRFLOW-2825]Fix S3ToHiveTransfer bug due to 
case
URL: https://github.com/apache/incubator-airflow/pull/3665
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/operators/s3_to_hive_operator.py 
b/airflow/operators/s3_to_hive_operator.py
index 09eb8363c0..5faaf916b7 100644
--- a/airflow/operators/s3_to_hive_operator.py
+++ b/airflow/operators/s3_to_hive_operator.py
@@ -153,7 +153,7 @@ def execute(self, context):
 
         root, file_ext = os.path.splitext(s3_key_object.key)
         if (self.select_expression and self.input_compressed and
-                file_ext != '.gz'):
+                file_ext.lower() != '.gz'):
             raise AirflowException("GZIP is the only compression " +
                                    "format Amazon S3 Select supports")
 
diff --git a/tests/operators/s3_to_hive_operator.py 
b/tests/operators/s3_to_hive_operator.py
index 482e7fefc8..6ca6274a2c 100644
--- a/tests/operators/s3_to_hive_operator.py
+++ b/tests/operators/s3_to_hive_operator.py
@@ -89,6 +89,11 @@ def setUp(self):
                                mode="wb") as f_gz_h:
                 self._set_fn(fn_gz, '.gz', True)
                 f_gz_h.writelines([header, line1, line2])
+            fn_gz_upper = self._get_fn('.txt', True) + ".GZ"
+            with gzip.GzipFile(filename=fn_gz_upper,
+                               mode="wb") as f_gz_upper_h:
+                self._set_fn(fn_gz_upper, '.GZ', True)
+                f_gz_upper_h.writelines([header, line1, line2])
             fn_bz2 = self._get_fn('.txt', True) + '.bz2'
             with bz2.BZ2File(filename=fn_bz2,
                              mode="wb") as f_bz2_h:
@@ -105,6 +110,11 @@ def setUp(self):
                                mode="wb") as f_gz_nh:
                 self._set_fn(fn_gz, '.gz', False)
                 f_gz_nh.writelines([line1, line2])
+            fn_gz_upper = self._get_fn('.txt', False) + ".GZ"
+            with gzip.GzipFile(filename=fn_gz_upper,
+                               mode="wb") as f_gz_upper_nh:
+                self._set_fn(fn_gz_upper, '.GZ', False)
+                f_gz_upper_nh.writelines([line1, line2])
             fn_bz2 = self._get_fn('.txt', False) + '.bz2'
             with bz2.BZ2File(filename=fn_bz2,
                              mode="wb") as f_bz2_nh:
@@ -143,7 +153,7 @@ def _check_file_equality(self, fn_1, fn_2, ext):
         # gz files contain mtime and filename in the header that
         # causes filecmp to return False even if contents are identical
         # Hence decompress to test for equality
-        if(ext == '.gz'):
+        if(ext.lower() == '.gz'):
             with gzip.GzipFile(fn_1, 'rb') as f_1,\
                  NamedTemporaryFile(mode='wb') as f_txt_1,\
                  gzip.GzipFile(fn_2, 'rb') as f_2,\
@@ -220,14 +230,14 @@ def test_execute(self, mock_hiveclihook):
         conn.create_bucket(Bucket='bucket')
 
         # Testing txt, zip, bz2 files with and without header row
-        for (ext, has_header) in product(['.txt', '.gz', '.bz2'], [True, 
False]):
+        for (ext, has_header) in product(['.txt', '.gz', '.bz2', '.GZ'], 
[True, False]):
             self.kwargs['headers'] = has_header
             self.kwargs['check_headers'] = has_header
             logging.info("Testing {0} format {1} header".
                          format(ext,
                                 ('with' if has_header else 'without'))
                          )
-            self.kwargs['input_compressed'] = ext != '.txt'
+            self.kwargs['input_compressed'] = ext.lower() != '.txt'
             self.kwargs['s3_key'] = 's3://bucket/' + self.s3_key + ext
             ip_fn = self._get_fn(ext, self.kwargs['headers'])
             op_fn = self._get_fn(ext, False)
@@ -260,8 +270,8 @@ def test_execute_with_select_expression(self, 
mock_hiveclihook):
         # Only testing S3ToHiveTransfer calls S3Hook.select_key with
         # the right parameters and its execute method succeeds here,
         # since Moto doesn't support select_object_content as of 1.3.2.
-        for (ext, has_header) in product(['.txt', '.gz'], [True, False]):
-            input_compressed = ext != '.txt'
+        for (ext, has_header) in product(['.txt', '.gz', '.GZ'], [True, 
False]):
+            input_compressed = ext.lower() != '.txt'
             key = self.s3_key + ext
 
             self.kwargs['check_headers'] = False


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> S3ToHiveTransfer operator may not may able to handle GZIP file with uppercase 
> ext in S3
> ---------------------------------------------------------------------------------------
>
>                 Key: AIRFLOW-2825
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-2825
>             Project: Apache Airflow
>          Issue Type: Bug
>          Components: operators
>            Reporter: Xiaodong DENG
>            Assignee: Xiaodong DENG
>            Priority: Critical
>
> Because upper/lower case was not considered in the extension check, 
> S3ToHiveTransfer operator may think a GZIP file with uppercase ext `.GZ` is 
> not a GZIP file and raise exception.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to