This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 42efefd9892 [GCSIO] Fix internal unit test failure (#32518)
42efefd9892 is described below
commit 42efefd989267b6d93fa5c912342feaaa07d7f02
Author: Shunping Huang <[email protected]>
AuthorDate: Mon Sep 23 10:48:02 2024 -0400
[GCSIO] Fix internal unit test failure (#32518)
* Fix internal unit test failure.
* Minor refactor and add comment.
* Fix test failure in github action.
* Mock is_soft_delete_enabled only if gcsio can be loaded.
* Disable unused import lint. It is using in mock.
* Format
---
.../python/apache_beam/options/pipeline_options.py | 5 +++
.../apache_beam/options/pipeline_options_test.py | 47 +++++++++++++---------
2 files changed, 34 insertions(+), 18 deletions(-)
diff --git a/sdks/python/apache_beam/options/pipeline_options.py
b/sdks/python/apache_beam/options/pipeline_options.py
index 50021c4610f..4497ab0993a 100644
--- a/sdks/python/apache_beam/options/pipeline_options.py
+++ b/sdks/python/apache_beam/options/pipeline_options.py
@@ -975,6 +975,11 @@ class GoogleCloudOptions(PipelineOptions):
# Log warning if soft delete policy is enabled in a gcs bucket
# that is specified in an argument.
def _warn_if_soft_delete_policy_enabled(self, arg_name):
+ # skip the check if it is in dry-run mode because the later step requires
+ # internet connection to access GCS
+ if self.view_as(TestOptions).dry_run:
+ return
+
gcs_path = getattr(self, arg_name, None)
try:
from apache_beam.io.gcp import gcsio
diff --git a/sdks/python/apache_beam/options/pipeline_options_test.py
b/sdks/python/apache_beam/options/pipeline_options_test.py
index 61b227d9a24..c0616bc6451 100644
--- a/sdks/python/apache_beam/options/pipeline_options_test.py
+++ b/sdks/python/apache_beam/options/pipeline_options_test.py
@@ -44,6 +44,12 @@ from apache_beam.transforms.display_test import
DisplayDataItemMatcher
_LOGGER = logging.getLogger(__name__)
+try:
+ import apache_beam.io.gcp.gcsio # pylint: disable=unused-import
+ has_gcsio = True
+except ImportError:
+ has_gcsio = False
+
# Mock runners to use for validations.
class MockRunners(object):
@@ -711,6 +717,16 @@ class PipelineOptionsTest(unittest.TestCase):
"the dest and the flag name to the map "
"_FLAG_THAT_SETS_FALSE_VALUE in PipelineOptions.py")
+ def _check_errors(self, options, validator, expected):
+ if has_gcsio:
+ with mock.patch('apache_beam.io.gcp.gcsio.GcsIO.is_soft_delete_enabled',
+ return_value=False):
+ errors = options._handle_temp_and_staging_locations(validator)
+ self.assertEqual(errors, expected)
+ else:
+ errors = options._handle_temp_and_staging_locations(validator)
+ self.assertEqual(errors, expected)
+
def test_validation_good_stg_good_temp(self):
runner = MockRunners.DataflowRunner()
options = GoogleCloudOptions([
@@ -719,8 +735,7 @@ class PipelineOptionsTest(unittest.TestCase):
'--temp_location=gs://beam/tmp'
])
validator = PipelineOptionsValidator(options, runner)
- errors = options._handle_temp_and_staging_locations(validator)
- self.assertEqual(errors, [])
+ self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/stg")
self.assertEqual(
@@ -734,8 +749,7 @@ class PipelineOptionsTest(unittest.TestCase):
'--temp_location=gs://beam/tmp'
])
validator = PipelineOptionsValidator(options, runner)
- errors = options._handle_temp_and_staging_locations(validator)
- self.assertEqual(errors, [])
+ self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/tmp")
self.assertEqual(
@@ -749,8 +763,7 @@ class PipelineOptionsTest(unittest.TestCase):
'--temp_location=badGSpath'
])
validator = PipelineOptionsValidator(options, runner)
- errors = options._handle_temp_and_staging_locations(validator)
- self.assertEqual(errors, [])
+ self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://beam/stg")
self.assertEqual(
@@ -764,8 +777,7 @@ class PipelineOptionsTest(unittest.TestCase):
'--temp_location=badGSpath'
])
validator = PipelineOptionsValidator(options, runner)
- errors = options._handle_temp_and_staging_locations(validator)
- self.assertEqual(errors, [])
+ self._check_errors(options, validator, [])
self.assertEqual(
options.get_all_options()['staging_location'], "gs://default/bucket")
self.assertEqual(
@@ -779,16 +791,15 @@ class PipelineOptionsTest(unittest.TestCase):
'--temp_location=badGSpath'
])
validator = PipelineOptionsValidator(options, runner)
- errors = options._handle_temp_and_staging_locations(validator)
- self.assertEqual(len(errors), 2, errors)
- self.assertIn(
- 'Invalid GCS path (badGSpath), given for the option: temp_location.',
- errors,
- errors)
- self.assertIn(
- 'Invalid GCS path (badGSpath), given for the option:
staging_location.',
- errors,
- errors)
+ self._check_errors(
+ options,
+ validator,
+ [
+ 'Invalid GCS path (badGSpath), given for the option: ' \
+ 'temp_location.',
+ 'Invalid GCS path (badGSpath), given for the option: ' \
+ 'staging_location.'
+ ])
if __name__ == '__main__':