This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 4da46b0ae5b Fix duplicated prefix when handing an absolute jar path
for provider. (#35849)
4da46b0ae5b is described below
commit 4da46b0ae5bf9060dfe0b8c7a7d2faa75b00a17a
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Aug 13 17:00:28 2025 -0400
Fix duplicated prefix when handing an absolute jar path for provider.
(#35849)
* Fix an issue of specify absolute jar path in provider.
* Add test and fix an edge case.
* Apply yapf.
* Polish based on gemini review.
* Resolve test failure on github.
* A second attempt to fix failed test.
* Contineu to fix test.
---
sdks/python/apache_beam/yaml/yaml_provider.py | 15 +++--
.../apache_beam/yaml/yaml_provider_unit_test.py | 66 ++++++++++++++++++++++
2 files changed, 77 insertions(+), 4 deletions(-)
diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py
b/sdks/python/apache_beam/yaml/yaml_provider.py
index 34c14c8ee8e..70ad9309dc3 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -1520,13 +1520,20 @@ def _as_list(func):
def _join_url_or_filepath(base, path):
if not base:
return path
- base_scheme = urllib.parse.urlparse(base, '').scheme
- path_scheme = urllib.parse.urlparse(path, base_scheme).scheme
- if path_scheme != base_scheme:
+
+ if urllib.parse.urlparse(path).scheme:
+ # path is an absolute path with scheme (whether it is the same as base or
+ # not).
return path
- elif base_scheme and base_scheme in urllib.parse.uses_relative:
+
+ # path is a relative path or an absolute path without scheme (e.g. /a/b/c)
+ base_scheme = urllib.parse.urlparse(base, '').scheme
+ if base_scheme and base_scheme in urllib.parse.uses_relative:
return urllib.parse.urljoin(base, path)
else:
+ if FileSystems.join(base, "") == base:
+ # base ends with a filesystem separator
+ return FileSystems.join(base, path)
return FileSystems.join(FileSystems.split(base)[0], path)
diff --git a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py
b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py
index fe8b6c7b89a..1ebae9a3b44 100644
--- a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py
@@ -17,6 +17,7 @@
import logging
import os
+import sys
import tempfile
import unittest
@@ -295,6 +296,71 @@ class PythonProviderDepsTest(unittest.TestCase):
self.assertNotEqual(before, after)
+class JoinUrlOrFilepathTest(unittest.TestCase):
+ def test_join_url_relative_path(self):
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath('http://example.com/a',
'b/c.yaml'),
+ 'http://example.com/b/c.yaml')
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath(
+ 'http://example.com/a/', 'b/c.yaml'),
+ 'http://example.com/a/b/c.yaml')
+
+ # use os.path.join to mock gcs filesystem split and join.
+ with mock.patch('apache_beam.io.filesystems.FileSystems.split',
+ new=lambda x:
+ ("gs://bucket", x.removeprefix("gs://bucket/"))):
+ with mock.patch('apache_beam.io.filesystems.FileSystems.join',
+ new=lambda *args: '/'.join(args)):
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath('gs://bucket', 'b/c.yaml'),
+ 'gs://bucket/b/c.yaml')
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath('gs://bucket/', 'b/c.yaml'),
+ 'gs://bucket/b/c.yaml')
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath('gs://bucket/a', 'b/c.yaml'),
+ 'gs://bucket/b/c.yaml')
+
+ def test_join_filepath_relative_path(self):
+ if sys.platform != 'win32':
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath('/a/b/', 'c/d.yaml'),
+ '/a/b/c/d.yaml')
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath('/a/b', 'c/d.yaml'),
+ '/a/c/d.yaml')
+ else:
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath('C:\\a\\b\\', 'c\\d.yaml'),
+ 'C:\\a\\b\\c\\d.yaml')
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath('C:\\a\\b', 'c\\d.yaml'),
+ 'C:\\a\\c\\d.yaml')
+
+ def test_absolute_path(self):
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath(
+ 'gs://bucket/a', 'gs://bucket/b/c.yaml'),
+ 'gs://bucket/b/c.yaml')
+
+ if sys.platform != 'win32':
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath('/a/b', '/c/d.yaml'),
'/c/d.yaml')
+
+ def test_different_scheme(self):
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath(
+ 'http://example.com/a', 'gs://bucket/b/c.yaml'),
+ 'gs://bucket/b/c.yaml')
+
+ def test_empty_base(self):
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath('', 'a/b.yaml'), 'a/b.yaml')
+ self.assertEqual(
+ yaml_provider._join_url_or_filepath(None, 'a/b.yaml'), 'a/b.yaml')
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()