This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 42efefd9892 [GCSIO] Fix internal unit test failure (#32518)
42efefd9892 is described below

commit 42efefd989267b6d93fa5c912342feaaa07d7f02
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Sep 23 10:48:02 2024 -0400

    [GCSIO] Fix internal unit test failure (#32518)
    
    * Fix internal unit test failure.
    
    * Minor refactor and add comment.
    
    * Fix test failure in github action.
    
    * Mock is_soft_delete_enabled only if gcsio can be loaded.
    
    * Disable unused import lint. It is using in mock.
    
    * Format
---
 .../python/apache_beam/options/pipeline_options.py |  5 +++
 .../apache_beam/options/pipeline_options_test.py   | 47 +++++++++++++---------
 2 files changed, 34 insertions(+), 18 deletions(-)

diff --git a/sdks/python/apache_beam/options/pipeline_options.py 
b/sdks/python/apache_beam/options/pipeline_options.py
index 50021c4610f..4497ab0993a 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -975,6 +975,11 @@ class GoogleCloudOptions(PipelineOptions):
   # Log warning if soft delete policy is enabled in a gcs bucket
   # that is specified in an argument.
   def _warn_if_soft_delete_policy_enabled(self, arg_name):
+    # skip the check if it is in dry-run mode because the later step requires
+    # internet connection to access GCS
+    if self.view_as(TestOptions).dry_run:
+      return
+
     gcs_path = getattr(self, arg_name, None)
     try:
       from apache_beam.io.gcp import gcsio
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py 
b/sdks/python/apache_beam/options/pipeline_options_test.py
index 61b227d9a24..c0616bc6451 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -44,6 +44,12 @@ from apache_beam.transforms.display_test import 
DisplayDataItemMatcher
 
 _LOGGER = logging.getLogger(__name__)
 
+try:
+  import apache_beam.io.gcp.gcsio  # pylint: disable=unused-import
+  has_gcsio = True
+except ImportError:
+  has_gcsio = False
+
 
 # Mock runners to use for validations.
 class MockRunners(object):
@@ -711,6 +717,16 @@ class PipelineOptionsTest(unittest.TestCase):
         "the dest and the flag name to the map "
         "_FLAG_THAT_SETS_FALSE_VALUE in PipelineOptions.py")
 
+  def _check_errors(self, options, validator, expected):
+    if has_gcsio:
+      with mock.patch('apache_beam.io.gcp.gcsio.GcsIO.is_soft_delete_enabled',
+                      return_value=False):
+        errors = options._handle_temp_and_staging_locations(validator)
+        self.assertEqual(errors, expected)
+    else:
+      errors = options._handle_temp_and_staging_locations(validator)
+      self.assertEqual(errors, expected)
+
   def test_validation_good_stg_good_temp(self):
     runner = MockRunners.DataflowRunner()
     options = GoogleCloudOptions([
@@ -719,8 +735,7 @@ class PipelineOptionsTest(unittest.TestCase):
         '--temp_location=gs://beam/tmp'
     ])
     validator = PipelineOptionsValidator(options, runner)
-    errors = options._handle_temp_and_staging_locations(validator)
-    self.assertEqual(errors, [])
+    self._check_errors(options, validator, [])
     self.assertEqual(
         options.get_all_options()['staging_location'], "gs://beam/stg")
     self.assertEqual(
@@ -734,8 +749,7 @@ class PipelineOptionsTest(unittest.TestCase):
         '--temp_location=gs://beam/tmp'
     ])
     validator = PipelineOptionsValidator(options, runner)
-    errors = options._handle_temp_and_staging_locations(validator)
-    self.assertEqual(errors, [])
+    self._check_errors(options, validator, [])
     self.assertEqual(
         options.get_all_options()['staging_location'], "gs://beam/tmp")
     self.assertEqual(
@@ -749,8 +763,7 @@ class PipelineOptionsTest(unittest.TestCase):
         '--temp_location=badGSpath'
     ])
     validator = PipelineOptionsValidator(options, runner)
-    errors = options._handle_temp_and_staging_locations(validator)
-    self.assertEqual(errors, [])
+    self._check_errors(options, validator, [])
     self.assertEqual(
         options.get_all_options()['staging_location'], "gs://beam/stg")
     self.assertEqual(
@@ -764,8 +777,7 @@ class PipelineOptionsTest(unittest.TestCase):
         '--temp_location=badGSpath'
     ])
     validator = PipelineOptionsValidator(options, runner)
-    errors = options._handle_temp_and_staging_locations(validator)
-    self.assertEqual(errors, [])
+    self._check_errors(options, validator, [])
     self.assertEqual(
         options.get_all_options()['staging_location'], "gs://default/bucket")
     self.assertEqual(
@@ -779,16 +791,15 @@ class PipelineOptionsTest(unittest.TestCase):
         '--temp_location=badGSpath'
     ])
     validator = PipelineOptionsValidator(options, runner)
-    errors = options._handle_temp_and_staging_locations(validator)
-    self.assertEqual(len(errors), 2, errors)
-    self.assertIn(
-        'Invalid GCS path (badGSpath), given for the option: temp_location.',
-        errors,
-        errors)
-    self.assertIn(
-        'Invalid GCS path (badGSpath), given for the option: 
staging_location.',
-        errors,
-        errors)
+    self._check_errors(
+        options,
+        validator,
+        [
+          'Invalid GCS path (badGSpath), given for the option: ' \
+            'temp_location.',
+          'Invalid GCS path (badGSpath), given for the option: ' \
+            'staging_location.'
+        ])
 
 
 if __name__ == '__main__':

Reply via email to