This is an automated email from the ASF dual-hosted git repository.

shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4da46b0ae5b Fix duplicated prefix when handing an absolute jar path 
for provider. (#35849)
4da46b0ae5b is described below

commit 4da46b0ae5bf9060dfe0b8c7a7d2faa75b00a17a
Author: Shunping Huang <[email protected]>
AuthorDate: Wed Aug 13 17:00:28 2025 -0400

    Fix duplicated prefix when handing an absolute jar path for provider. 
(#35849)
    
    * Fix an issue of specify absolute jar path in provider.
    
    * Add test and fix an edge case.
    
    * Apply yapf.
    
    * Polish based on gemini review.
    
    * Resolve test failure on github.
    
    * A second attempt to fix failed test.
    
    * Contineu to fix test.
---
 sdks/python/apache_beam/yaml/yaml_provider.py      | 15 +++--
 .../apache_beam/yaml/yaml_provider_unit_test.py    | 66 ++++++++++++++++++++++
 2 files changed, 77 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/yaml/yaml_provider.py 
b/sdks/python/apache_beam/yaml/yaml_provider.py
index 34c14c8ee8e..70ad9309dc3 100755
--- a/sdks/python/apache_beam/yaml/yaml_provider.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider.py
@@ -1520,13 +1520,20 @@ def _as_list(func):
 def _join_url_or_filepath(base, path):
   if not base:
     return path
-  base_scheme = urllib.parse.urlparse(base, '').scheme
-  path_scheme = urllib.parse.urlparse(path, base_scheme).scheme
-  if path_scheme != base_scheme:
+
+  if urllib.parse.urlparse(path).scheme:
+    # path is an absolute path with scheme (whether it is the same as base or
+    # not).
     return path
-  elif base_scheme and base_scheme in urllib.parse.uses_relative:
+
+  # path is a relative path or an absolute path without scheme (e.g. /a/b/c)
+  base_scheme = urllib.parse.urlparse(base, '').scheme
+  if base_scheme and base_scheme in urllib.parse.uses_relative:
     return urllib.parse.urljoin(base, path)
   else:
+    if FileSystems.join(base, "") == base:
+      # base ends with a filesystem separator
+      return FileSystems.join(base, path)
     return FileSystems.join(FileSystems.split(base)[0], path)
 
 
diff --git a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py 
b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py
index fe8b6c7b89a..1ebae9a3b44 100644
--- a/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py
+++ b/sdks/python/apache_beam/yaml/yaml_provider_unit_test.py
@@ -17,6 +17,7 @@
 
 import logging
 import os
+import sys
 import tempfile
 import unittest
 
@@ -295,6 +296,71 @@ class PythonProviderDepsTest(unittest.TestCase):
       self.assertNotEqual(before, after)
 
 
+class JoinUrlOrFilepathTest(unittest.TestCase):
+  def test_join_url_relative_path(self):
+    self.assertEqual(
+        yaml_provider._join_url_or_filepath('http://example.com/a', 
'b/c.yaml'),
+        'http://example.com/b/c.yaml')
+    self.assertEqual(
+        yaml_provider._join_url_or_filepath(
+            'http://example.com/a/', 'b/c.yaml'),
+        'http://example.com/a/b/c.yaml')
+
+    # use os.path.join to mock gcs filesystem split and join.
+    with mock.patch('apache_beam.io.filesystems.FileSystems.split',
+                    new=lambda x:
+                    ("gs://bucket", x.removeprefix("gs://bucket/"))):
+      with mock.patch('apache_beam.io.filesystems.FileSystems.join',
+                      new=lambda *args: '/'.join(args)):
+        self.assertEqual(
+            yaml_provider._join_url_or_filepath('gs://bucket', 'b/c.yaml'),
+            'gs://bucket/b/c.yaml')
+        self.assertEqual(
+            yaml_provider._join_url_or_filepath('gs://bucket/', 'b/c.yaml'),
+            'gs://bucket/b/c.yaml')
+        self.assertEqual(
+            yaml_provider._join_url_or_filepath('gs://bucket/a', 'b/c.yaml'),
+            'gs://bucket/b/c.yaml')
+
+  def test_join_filepath_relative_path(self):
+    if sys.platform != 'win32':
+      self.assertEqual(
+          yaml_provider._join_url_or_filepath('/a/b/', 'c/d.yaml'),
+          '/a/b/c/d.yaml')
+      self.assertEqual(
+          yaml_provider._join_url_or_filepath('/a/b', 'c/d.yaml'),
+          '/a/c/d.yaml')
+    else:
+      self.assertEqual(
+          yaml_provider._join_url_or_filepath('C:\\a\\b\\', 'c\\d.yaml'),
+          'C:\\a\\b\\c\\d.yaml')
+      self.assertEqual(
+          yaml_provider._join_url_or_filepath('C:\\a\\b', 'c\\d.yaml'),
+          'C:\\a\\c\\d.yaml')
+
+  def test_absolute_path(self):
+    self.assertEqual(
+        yaml_provider._join_url_or_filepath(
+            'gs://bucket/a', 'gs://bucket/b/c.yaml'),
+        'gs://bucket/b/c.yaml')
+
+    if sys.platform != 'win32':
+      self.assertEqual(
+          yaml_provider._join_url_or_filepath('/a/b', '/c/d.yaml'), 
'/c/d.yaml')
+
+  def test_different_scheme(self):
+    self.assertEqual(
+        yaml_provider._join_url_or_filepath(
+            'http://example.com/a', 'gs://bucket/b/c.yaml'),
+        'gs://bucket/b/c.yaml')
+
+  def test_empty_base(self):
+    self.assertEqual(
+        yaml_provider._join_url_or_filepath('', 'a/b.yaml'), 'a/b.yaml')
+    self.assertEqual(
+        yaml_provider._join_url_or_filepath(None, 'a/b.yaml'), 'a/b.yaml')
+
+
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)
   unittest.main()

Reply via email to