This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b7f6082e77 [python] support data file.external paths in pypaimon
(#6674)
b7f6082e77 is described below
commit b7f6082e77a459dfba8573f65bb07a36354fb3b3
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Nov 26 21:38:09 2025 +0800
[python] support data file.external paths in pypaimon (#6674)
---
paimon-python/pypaimon/common/core_options.py | 71 +++-
.../pypaimon/common/external_path_provider.py | 43 +++
.../pypaimon/read/scanner/full_starting_scanner.py | 4 +-
paimon-python/pypaimon/read/split_read.py | 5 +-
paimon-python/pypaimon/table/file_store_table.py | 72 +++-
.../pypaimon/tests/external_paths_test.py | 427 +++++++++++++++++++++
paimon-python/pypaimon/utils/__init__.py | 17 +
.../pypaimon/utils/file_store_path_factory.py | 115 ++++++
paimon-python/pypaimon/write/file_store_commit.py | 16 +-
paimon-python/pypaimon/write/writer/blob_writer.py | 18 +-
.../pypaimon/write/writer/data_blob_writer.py | 11 +-
paimon-python/pypaimon/write/writer/data_writer.py | 63 +--
12 files changed, 813 insertions(+), 49 deletions(-)
diff --git a/paimon-python/pypaimon/common/core_options.py
b/paimon-python/pypaimon/common/core_options.py
index 87b8e9034a..028f757b77 100644
--- a/paimon-python/pypaimon/common/core_options.py
+++ b/paimon-python/pypaimon/common/core_options.py
@@ -17,6 +17,7 @@
################################################################################
from enum import Enum
+from typing import List, Optional
from pypaimon.common.memory_size import MemorySize
@@ -58,13 +59,17 @@ class CoreOptions(str, Enum):
COMMIT_USER_PREFIX = "commit.user-prefix"
ROW_TRACKING_ENABLED = "row-tracking.enabled"
DATA_EVOLUTION_ENABLED = "data-evolution.enabled"
+ # External paths options
+ DATA_FILE_EXTERNAL_PATHS = "data-file.external-paths"
+ DATA_FILE_EXTERNAL_PATHS_STRATEGY = "data-file.external-paths.strategy"
+ DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS =
"data-file.external-paths.specific-fs"
@staticmethod
- def get_blob_as_descriptor(options: dict) -> bool:
+ def blob_as_descriptor(options: dict) -> bool:
return options.get(CoreOptions.FILE_BLOB_AS_DESCRIPTOR,
"false").lower() == 'true'
@staticmethod
- def get_split_target_size(options: dict) -> int:
+ def split_target_size(options: dict) -> int:
"""Get split target size from options, default to 128MB."""
if CoreOptions.SOURCE_SPLIT_TARGET_SIZE in options:
size_str = options[CoreOptions.SOURCE_SPLIT_TARGET_SIZE]
@@ -72,7 +77,7 @@ class CoreOptions(str, Enum):
return MemorySize.of_mebi_bytes(128).get_bytes()
@staticmethod
- def get_split_open_file_cost(options: dict) -> int:
+ def split_open_file_cost(options: dict) -> int:
"""Get split open file cost from options, default to 4MB."""
if CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST in options:
cost_str = options[CoreOptions.SOURCE_SPLIT_OPEN_FILE_COST]
@@ -80,7 +85,7 @@ class CoreOptions(str, Enum):
return MemorySize.of_mebi_bytes(4).get_bytes()
@staticmethod
- def get_target_file_size(options: dict, has_primary_key: bool = False) ->
int:
+ def target_file_size(options: dict, has_primary_key: bool = False) -> int:
"""Get target file size from options, default to 128MB for primary key
table, 256MB for append-only table."""
if CoreOptions.TARGET_FILE_SIZE in options:
size_str = options[CoreOptions.TARGET_FILE_SIZE]
@@ -88,9 +93,63 @@ class CoreOptions(str, Enum):
return MemorySize.of_mebi_bytes(128 if has_primary_key else
256).get_bytes()
@staticmethod
- def get_blob_target_file_size(options: dict) -> int:
+ def blob_target_file_size(options: dict) -> int:
"""Get blob target file size from options, default to target-file-size
(256MB for append-only table)."""
if CoreOptions.BLOB_TARGET_FILE_SIZE in options:
size_str = options[CoreOptions.BLOB_TARGET_FILE_SIZE]
return MemorySize.parse(size_str).get_bytes()
- return CoreOptions.get_target_file_size(options, has_primary_key=False)
+ return CoreOptions.target_file_size(options, has_primary_key=False)
+
+ @staticmethod
+ def data_file_external_paths(options: dict) -> Optional[List[str]]:
+ external_paths_str = options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS)
+ if not external_paths_str:
+ return None
+ return [path.strip() for path in external_paths_str.split(",") if
path.strip()]
+
+ @staticmethod
+ def external_path_strategy(options: dict) -> 'ExternalPathStrategy':
+ strategy_value =
options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY, "none")
+ if strategy_value is None:
+ strategy_value = "none"
+
+ strategy_str = strategy_value.lower() if isinstance(strategy_value,
str) else str(strategy_value).lower()
+
+ try:
+ return ExternalPathStrategy(strategy_str)
+ except ValueError:
+ valid_values = [e.value for e in ExternalPathStrategy]
+ raise ValueError(
+ f"Could not parse value '{strategy_value}' for key
'{CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY}'. "
+ f"Expected one of: {valid_values}"
+ )
+
+ @staticmethod
+ def external_specific_fs(options: dict) -> Optional[str]:
+ return options.get(CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS)
+
+ @staticmethod
+ def file_compression(options: dict) -> str:
+ """Get file compression from options, default to 'zstd'."""
+ compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
+ if compression is None:
+ compression = "zstd"
+ return compression
+
+ @staticmethod
+ def file_format(options: dict, default: Optional[str] = None) -> str:
+ if default is None:
+ default = CoreOptions.FILE_FORMAT_PARQUET
+ file_format = options.get(CoreOptions.FILE_FORMAT, default)
+ if file_format is None:
+ file_format = default
+ return file_format.lower() if file_format else file_format
+
+
+class ExternalPathStrategy(str, Enum):
+ """
+ Strategy for selecting external paths.
+ """
+ NONE = "none"
+ ROUND_ROBIN = "round-robin"
+ SPECIFIC_FS = "specific-fs"
diff --git a/paimon-python/pypaimon/common/external_path_provider.py
b/paimon-python/pypaimon/common/external_path_provider.py
new file mode 100644
index 0000000000..a2989120b6
--- /dev/null
+++ b/paimon-python/pypaimon/common/external_path_provider.py
@@ -0,0 +1,43 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import random
+from typing import List
+
+
+class ExternalPathProvider:
+ def __init__(self, external_table_paths: List[str], relative_bucket_path:
str):
+ self.external_table_paths = external_table_paths
+ self.relative_bucket_path = relative_bucket_path
+ self.position = random.randint(0, len(external_table_paths) - 1) if
external_table_paths else 0
+
+ def get_next_external_data_path(self, file_name: str) -> str:
+ """
+ Get the next external data path using round-robin strategy.
+ """
+ if not self.external_table_paths:
+ raise ValueError("No external paths available")
+
+ self.position += 1
+ if self.position == len(self.external_table_paths):
+ self.position = 0
+
+ external_base = self.external_table_paths[self.position]
+ if self.relative_bucket_path:
+ return
f"{external_base.rstrip('/')}/{self.relative_bucket_path.strip('/')}/{file_name}"
+ else:
+ return f"{external_base.rstrip('/')}/{file_name}"
diff --git a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
index dfbe329f43..a3503cdd03 100644
--- a/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
+++ b/paimon-python/pypaimon/read/scanner/full_starting_scanner.py
@@ -55,8 +55,8 @@ class FullStartingScanner(StartingScanner):
self.predicate, self.table.field_names, self.table.partition_keys)
# Get split target size and open file cost from table options
- self.target_split_size =
CoreOptions.get_split_target_size(self.table.options)
- self.open_file_cost =
CoreOptions.get_split_open_file_cost(self.table.options)
+ self.target_split_size =
CoreOptions.split_target_size(self.table.options)
+ self.open_file_cost =
CoreOptions.split_open_file_cost(self.table.options)
self.idx_of_this_subtask = None
self.number_of_para_subtasks = None
diff --git a/paimon-python/pypaimon/read/split_read.py
b/paimon-python/pypaimon/read/split_read.py
index 5c90c2ddb6..92152db7ee 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -88,7 +88,8 @@ class SplitRead(ABC):
def file_reader_supplier(self, file: DataFileMeta, for_merge_read: bool,
read_fields: List[str]):
(read_file_fields, read_arrow_predicate) =
self._get_fields_and_predicate(file.schema_id, read_fields)
- file_path = file.file_path
+ # Use external_path if available, otherwise use file_path
+ file_path = file.external_path if file.external_path else
file.file_path
_, extension = os.path.splitext(file_path)
file_format = extension[1:]
@@ -97,7 +98,7 @@ class SplitRead(ABC):
format_reader = FormatAvroReader(self.table.file_io, file_path,
read_file_fields,
self.read_fields,
read_arrow_predicate)
elif file_format == CoreOptions.FILE_FORMAT_BLOB:
- blob_as_descriptor =
CoreOptions.get_blob_as_descriptor(self.table.options)
+ blob_as_descriptor =
CoreOptions.blob_as_descriptor(self.table.options)
format_reader = FormatBlobReader(self.table.file_io, file_path,
read_file_fields,
self.read_fields,
read_arrow_predicate, blob_as_descriptor)
elif file_format == CoreOptions.FILE_FORMAT_PARQUET or file_format ==
CoreOptions.FILE_FORMAT_ORC:
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index b8e08b6f52..382d3a59b0 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-from typing import Optional
+from typing import List, Optional
from pypaimon.catalog.catalog_environment import CatalogEnvironment
from pypaimon.common.core_options import CoreOptions
@@ -70,6 +70,32 @@ class FileStoreTable(Table):
from pypaimon.snapshot.snapshot_manager import SnapshotManager
return SnapshotManager(self)
+ def path_factory(self) -> 'FileStorePathFactory':
+ from pypaimon.utils.file_store_path_factory import FileStorePathFactory
+
+ # Get external paths
+ external_paths = self._create_external_paths()
+
+ # Get format identifier
+ format_identifier = CoreOptions.file_format(self.options)
+
+ file_compression = CoreOptions.file_compression(self.options)
+
+ return FileStorePathFactory(
+ root=str(self.table_path),
+ partition_keys=self.partition_keys,
+ default_part_value="__DEFAULT_PARTITION__",
+ format_identifier=format_identifier,
+ data_file_prefix="data-",
+ changelog_file_prefix="changelog-",
+ legacy_partition_name=True,
+ file_suffix_include_compression=False,
+ file_compression=file_compression,
+ data_file_path_directory=None,
+ external_paths=external_paths,
+ index_file_in_data_file_dir=False,
+ )
+
def new_snapshot_commit(self):
"""Create a new SnapshotCommit instance using the catalog
environment."""
return
self.catalog_environment.snapshot_commit(self.snapshot_manager())
@@ -129,3 +155,47 @@ class FileStoreTable(Table):
def add_options(self, options: dict):
for key, value in options.items():
self.options[key] = value
+
+ def _create_external_paths(self) -> List[str]:
+ from urllib.parse import urlparse
+ from pypaimon.common.core_options import ExternalPathStrategy
+
+ external_paths_str = CoreOptions.data_file_external_paths(self.options)
+ if not external_paths_str:
+ return []
+
+ strategy = CoreOptions.external_path_strategy(self.options)
+ if strategy == ExternalPathStrategy.NONE:
+ return []
+
+ specific_fs = CoreOptions.external_specific_fs(self.options)
+
+ paths = []
+ for path_string in external_paths_str:
+ if not path_string:
+ continue
+
+ # Parse and validate path
+ parsed = urlparse(path_string)
+ scheme = parsed.scheme
+ if not scheme:
+ raise ValueError(
+ f"External path must have a scheme (e.g., oss://, s3://,
file://): {path_string}"
+ )
+
+ # Filter by specific filesystem if strategy is specific-fs
+ if strategy == ExternalPathStrategy.SPECIFIC_FS:
+ if not specific_fs:
+ raise ValueError(
+ f"data-file.external-paths.specific-fs must be set
when "
+ f"strategy is {ExternalPathStrategy.SPECIFIC_FS}"
+ )
+ if scheme.lower() != specific_fs.lower():
+ continue # Skip paths that don't match the specific
filesystem
+
+ paths.append(path_string)
+
+ if not paths:
+ raise ValueError("No valid external paths found after filtering")
+
+ return paths
diff --git a/paimon-python/pypaimon/tests/external_paths_test.py
b/paimon-python/pypaimon/tests/external_paths_test.py
new file mode 100644
index 0000000000..1e5a404075
--- /dev/null
+++ b/paimon-python/pypaimon/tests/external_paths_test.py
@@ -0,0 +1,427 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.catalog.catalog import Identifier
+from pypaimon.common.core_options import CoreOptions, ExternalPathStrategy
+from pypaimon.common.external_path_provider import ExternalPathProvider
+
+
+class ExternalPathProviderTest(unittest.TestCase):
+ """Test ExternalPathProvider functionality."""
+
+ def test_path_selection_and_structure(self):
+ """Test path selection (round-robin) and path structure with various
scenarios."""
+ # Test multiple paths with round-robin
+ external_paths = [
+ "oss://bucket1/external",
+ "oss://bucket2/external",
+ "oss://bucket3/external",
+ ]
+ relative_path = "partition=value/bucket-0"
+ provider = ExternalPathProvider(external_paths, relative_path)
+
+ paths = [provider.get_next_external_data_path("file.parquet") for _ in
range(6)]
+
+ # Verify all buckets are used (2 cycles = 2 times each)
+ bucket_counts = {f"bucket{i}": sum(1 for p in paths if f"bucket{i}" in
p) for i in [1, 2, 3]}
+ self.assertEqual(bucket_counts["bucket1"], 2)
+ self.assertEqual(bucket_counts["bucket2"], 2)
+ self.assertEqual(bucket_counts["bucket3"], 2)
+
+ # Verify path structure
+ self.assertIn("partition=value", paths[0])
+ self.assertIn("bucket-0", paths[0])
+ self.assertIn("file.parquet", paths[0])
+
+ # Test single path
+ single_provider = ExternalPathProvider(["oss://bucket/external"],
"bucket-0")
+ single_path =
single_provider.get_next_external_data_path("data.parquet")
+ self.assertIn("bucket/external", single_path)
+ self.assertIn("bucket-0", single_path)
+ self.assertIn("data.parquet", single_path)
+
+ # Test empty relative path
+ empty_provider = ExternalPathProvider(["oss://bucket/external"], "")
+ empty_path = empty_provider.get_next_external_data_path("file.parquet")
+ self.assertIn("bucket/external", empty_path)
+ self.assertIn("file.parquet", empty_path)
+
+
+class ExternalPathsConfigTest(unittest.TestCase):
+ """Test external paths configuration parsing through
FileStoreTable._create_external_paths()."""
+
+ @classmethod
+ def setUpClass(cls):
+ """Set up test environment."""
+ cls.temp_dir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.temp_dir, "warehouse")
+ cls.catalog = CatalogFactory.create({"warehouse": cls.warehouse})
+ cls.catalog.create_database("test_db", False)
+
+ @classmethod
+ def tearDownClass(cls):
+ """Clean up test environment."""
+ shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+ def _create_table_with_options(self, options: dict) -> 'FileStoreTable':
+ """Helper method to create a table with specific options."""
+ table_name = "test_db.config_test"
+ # Manually delete table directory if it exists to ensure clean test
environment
+ # FileSystemCatalog doesn't have drop_table method, so we need to
delete manually
+ try:
+ table_path =
self.catalog.get_table_path(Identifier.from_string(table_name))
+ # file_io.exists and delete accept Union[Path, URL, str]
+ if self.catalog.file_io.exists(table_path):
+ self.catalog.file_io.delete(table_path, recursive=True)
+ except Exception:
+ pass # Table may not exist, ignore
+ pa_schema = pa.schema([("id", pa.int32()), ("name", pa.string())])
+ schema = Schema.from_pyarrow_schema(pa_schema, options=options)
+ self.catalog.create_table(table_name, schema, True)
+ return self.catalog.get_table(table_name)
+
+ def test_external_paths_strategies(self):
+ """Test different external path strategies (round-robin, specific-fs,
none)."""
+ # Test round-robin strategy
+ options = {
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS:
"oss://bucket1/path1,oss://bucket2/path2,oss://bucket3/path3",
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY:
ExternalPathStrategy.ROUND_ROBIN,
+ }
+ table = self._create_table_with_options(options)
+ paths = table._create_external_paths()
+ self.assertEqual(len(paths), 3)
+ self.assertEqual(str(paths[0]), "oss://bucket1/path1")
+ self.assertEqual(str(paths[1]), "oss://bucket2/path2")
+ self.assertEqual(str(paths[2]), "oss://bucket3/path3")
+
+ # Test specific-fs strategy
+ options2 = {
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS:
"oss://bucket1/path1,s3://bucket2/path2,oss://bucket3/path3",
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY:
ExternalPathStrategy.SPECIFIC_FS,
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS_SPECIFIC_FS: "oss",
+ }
+ table2 = self._create_table_with_options(options2)
+ paths2 = table2._create_external_paths()
+ self.assertEqual(len(paths2), 2)
+ self.assertIn("oss://bucket1/path1", [str(p) for p in paths2])
+ self.assertIn("oss://bucket3/path3", [str(p) for p in paths2])
+ self.assertNotIn("s3://bucket2/path2", [str(p) for p in paths2])
+
+ # Test none strategy
+ options3 = {
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS: "oss://bucket1/path1",
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY:
ExternalPathStrategy.NONE,
+ }
+ table3 = self._create_table_with_options(options3)
+ paths3 = table3._create_external_paths()
+ self.assertEqual(len(paths3), 0)
+
+ def test_external_paths_edge_cases(self):
+ """Test edge cases: empty string, no config, invalid scheme."""
+ # Test empty string
+ options = {
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS: "",
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY:
ExternalPathStrategy.ROUND_ROBIN,
+ }
+ table = self._create_table_with_options(options)
+ self.assertEqual(len(table._create_external_paths()), 0)
+
+ # Test no external paths option
+ options2 = {CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY:
ExternalPathStrategy.ROUND_ROBIN}
+ table2 = self._create_table_with_options(options2)
+ self.assertEqual(len(table2._create_external_paths()), 0)
+
+ # Test invalid scheme (no scheme)
+ options3 = {
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS: "/invalid/path",
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY:
ExternalPathStrategy.ROUND_ROBIN,
+ }
+ table3 = self._create_table_with_options(options3)
+ with self.assertRaises(ValueError) as context:
+ table3._create_external_paths()
+ self.assertIn("scheme", str(context.exception))
+
+ def test_create_external_path_provider(self):
+ """Test creating ExternalPathProvider from path factory."""
+ table_name = "test_db.config_test"
+ # Drop table if exists to ensure clean test environment
+ try:
+ self.catalog.drop_table(table_name, True)
+ except Exception:
+ pass # Table may not exist, ignore
+ options = {
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS:
"oss://bucket1/path1,oss://bucket2/path2",
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY:
ExternalPathStrategy.ROUND_ROBIN,
+ }
+ pa_schema = pa.schema([
+ ("id", pa.int32()),
+ ("name", pa.string()),
+ ("dt", pa.string()),
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=["dt"],
options=options)
+ self.catalog.create_table(table_name, schema, True)
+ table = self.catalog.get_table(table_name)
+ path_factory = table.path_factory()
+
+ # Test with external paths configured
+ provider = path_factory.create_external_path_provider(("value1",), 0)
+ self.assertIsNotNone(provider)
+ path = provider.get_next_external_data_path("file.parquet")
+ self.assertTrue("bucket1" in str(path) or "bucket2" in str(path))
+ self.assertIn("dt=value1", str(path))
+ self.assertIn("bucket-0", str(path))
+
+ # Test with none strategy (should return None)
+ # Use a different table name to avoid conflicts
+ options2 = {
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS: "oss://bucket1/path1",
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY:
ExternalPathStrategy.NONE,
+ }
+ pa_schema2 = pa.schema([("id", pa.int32()), ("name", pa.string())])
+ schema2 = Schema.from_pyarrow_schema(pa_schema2, options=options2)
+ table_name2 = "test_db.config_test_none"
+ try:
+ # Manually delete table directory if it exists
+ table_path2 =
self.catalog.get_table_path(Identifier.from_string(table_name2))
+ # file_io.exists and delete accept Union[Path, URL, str]
+ if self.catalog.file_io.exists(table_path2):
+ self.catalog.file_io.delete(table_path2, recursive=True)
+ except Exception:
+ pass # Table may not exist, ignore
+ self.catalog.create_table(table_name2, schema2, True)
+ table2 = self.catalog.get_table(table_name2)
+ provider2 = table2.path_factory().create_external_path_provider((), 0)
+ self.assertIsNone(provider2)
+
+ # Test without external paths config (should return None)
+ # Use a different table name to avoid conflicts
+ options3 = {}
+ pa_schema3 = pa.schema([("id", pa.int32()), ("name", pa.string())])
+ schema3 = Schema.from_pyarrow_schema(pa_schema3, options=options3)
+ table_name3 = "test_db.config_test_empty"
+ try:
+ # Manually delete table directory if it exists
+ table_path3 =
self.catalog.get_table_path(Identifier.from_string(table_name3))
+ # file_io.exists and delete accept Union[Path, URL, str]
+ if self.catalog.file_io.exists(table_path3):
+ self.catalog.file_io.delete(table_path3, recursive=True)
+ except Exception:
+ pass # Table may not exist, ignore
+ self.catalog.create_table(table_name3, schema3, True)
+ table3 = self.catalog.get_table(table_name3)
+ provider3 = table3.path_factory().create_external_path_provider((), 0)
+ self.assertIsNone(provider3)
+
+
+class ExternalPathsIntegrationTest(unittest.TestCase):
+ """Integration tests for external paths feature."""
+
+ @classmethod
+ def setUpClass(cls):
+ """Set up test environment."""
+ cls.temp_dir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.temp_dir, "warehouse")
+ cls.external_dir = os.path.join(cls.temp_dir, "external_data")
+
+ # Create external directory
+ os.makedirs(cls.external_dir, exist_ok=True)
+
+ cls.catalog = CatalogFactory.create({
+ "warehouse": cls.warehouse
+ })
+ cls.catalog.create_database("test_db", False)
+
+ @classmethod
+ def tearDownClass(cls):
+ """Clean up test environment."""
+ shutil.rmtree(cls.temp_dir, ignore_errors=True)
+
+ def test_write_with_external_paths(self):
+ """Test writing data with external paths configured."""
+ pa_schema = pa.schema([
+ ("id", pa.int32()),
+ ("name", pa.string()),
+ ("value", pa.float64()),
+ ])
+
+ # Create table with external paths
+ external_path = f"file://{self.external_dir}"
+ table_options = {
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS: external_path,
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY:
ExternalPathStrategy.ROUND_ROBIN,
+ }
+ schema = Schema.from_pyarrow_schema(pa_schema, options=table_options)
+
+ self.catalog.create_table("test_db.external_test", schema, False)
+ table = self.catalog.get_table("test_db.external_test")
+
+ # Write data (use explicit schema to match table schema)
+ data = pa.Table.from_pydict({
+ "id": [1, 2, 3],
+ "name": ["Alice", "Bob", "Charlie"],
+ "value": [10.5, 20.3, 30.7],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data)
+ commit_messages = table_write.prepare_commit()
+
+ # Verify external_path is set in file metadata
+ self.assertGreater(len(commit_messages), 0)
+ for commit_msg in commit_messages:
+ self.assertGreater(len(commit_msg.new_files), 0)
+ for file_meta in commit_msg.new_files:
+ # External path should be set
+ self.assertIsNotNone(file_meta.external_path)
+ self.assertTrue(file_meta.external_path.startswith("file://"))
+ self.assertIn(self.external_dir, file_meta.external_path)
+
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ def test_read_with_external_paths(self):
+ """Test reading data with external paths."""
+ pa_schema = pa.schema([
+ ("id", pa.int32()),
+ ("name", pa.string()),
+ ])
+
+ # Create table with external paths
+ external_path = f"file://{self.external_dir}"
+ table_options = {
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS: external_path,
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY:
ExternalPathStrategy.ROUND_ROBIN,
+ }
+ schema = Schema.from_pyarrow_schema(pa_schema, options=table_options)
+
+ self.catalog.create_table("test_db.external_read_test", schema, False)
+ table = self.catalog.get_table("test_db.external_read_test")
+
+ # Write data (use explicit schema to match table schema)
+ write_data = pa.Table.from_pydict({
+ "id": [1, 2, 3, 4, 5],
+ "name": ["A", "B", "C", "D", "E"],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(write_data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # Read data back
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ result = table_read.to_arrow(table_scan.plan().splits())
+
+ # Verify data
+ self.assertEqual(result.num_rows, 5)
+ self.assertEqual(result.num_columns, 2)
+ self.assertListEqual(result.column("id").to_pylist(), [1, 2, 3, 4, 5])
+ self.assertListEqual(result.column("name").to_pylist(), ["A", "B",
"C", "D", "E"])
+
+ def test_write_without_external_paths(self):
+ """Test that writing without external paths still works."""
+ pa_schema = pa.schema([
+ ("id", pa.int32()),
+ ("name", pa.string()),
+ ])
+
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.catalog.create_table("test_db.normal_test", schema, False)
+ table = self.catalog.get_table("test_db.normal_test")
+
+ # Write data (use explicit schema to match table schema)
+ data = pa.Table.from_pydict({
+ "id": [1, 2, 3],
+ "name": ["X", "Y", "Z"],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data)
+ commit_messages = table_write.prepare_commit()
+
+ # Verify external_path is None (not configured)
+ for commit_msg in commit_messages:
+ for file_meta in commit_msg.new_files:
+ self.assertIsNone(file_meta.external_path)
+
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ def test_external_paths_with_partition(self):
+ """Test external paths with partitioned table."""
+ pa_schema = pa.schema([
+ ("id", pa.int32()),
+ ("name", pa.string()),
+ ("dt", pa.string()),
+ ])
+
+ external_path = f"file://{self.external_dir}"
+ table_options = {
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS: external_path,
+ CoreOptions.DATA_FILE_EXTERNAL_PATHS_STRATEGY:
ExternalPathStrategy.ROUND_ROBIN,
+ }
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=["dt"],
options=table_options)
+
+ self.catalog.create_table("test_db.partitioned_external", schema,
False)
+ table = self.catalog.get_table("test_db.partitioned_external")
+
+ # Write data with partition (use explicit schema to match table schema)
+ data = pa.Table.from_pydict({
+ "id": [1, 2, 3],
+ "name": ["A", "B", "C"],
+ "dt": ["2024-01-01", "2024-01-01", "2024-01-02"],
+ }, schema=pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(data)
+ commit_messages = table_write.prepare_commit()
+
+ # Verify external paths include partition info
+ for commit_msg in commit_messages:
+ for file_meta in commit_msg.new_files:
+ self.assertIsNotNone(file_meta.external_path)
+ # Should contain partition path
+ self.assertIn("dt=", file_meta.external_path)
+
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/paimon-python/pypaimon/utils/__init__.py
b/paimon-python/pypaimon/utils/__init__.py
new file mode 100644
index 0000000000..cefe81778c
--- /dev/null
+++ b/paimon-python/pypaimon/utils/__init__.py
@@ -0,0 +1,17 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
diff --git a/paimon-python/pypaimon/utils/file_store_path_factory.py
b/paimon-python/pypaimon/utils/file_store_path_factory.py
new file mode 100644
index 0000000000..b99d9d5f29
--- /dev/null
+++ b/paimon-python/pypaimon/utils/file_store_path_factory.py
@@ -0,0 +1,115 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import List, Optional, Tuple
+
+from pypaimon.common.external_path_provider import ExternalPathProvider
+from pypaimon.table.bucket_mode import BucketMode
+
+
+class FileStorePathFactory:
+ MANIFEST_PATH = "manifest"
+ MANIFEST_PREFIX = "manifest-"
+ MANIFEST_LIST_PREFIX = "manifest-list-"
+ INDEX_MANIFEST_PREFIX = "index-manifest-"
+
+ INDEX_PATH = "index"
+ INDEX_PREFIX = "index-"
+
+ STATISTICS_PATH = "statistics"
+ STATISTICS_PREFIX = "stat-"
+
+ BUCKET_PATH_PREFIX = "bucket-"
+
+ def __init__(
+ self,
+ root: str,
+ partition_keys: List[str],
+ default_part_value: str,
+ format_identifier: str,
+ data_file_prefix: str,
+ changelog_file_prefix: str,
+ legacy_partition_name: bool,
+ file_suffix_include_compression: bool,
+ file_compression: str,
+ data_file_path_directory: Optional[str] = None,
+ external_paths: Optional[List[str]] = None,
+ index_file_in_data_file_dir: bool = False,
+ ):
+ self._root = root.rstrip('/')
+ self.partition_keys = partition_keys
+ self.default_part_value = default_part_value
+ self.format_identifier = format_identifier
+ self.data_file_prefix = data_file_prefix
+ self.changelog_file_prefix = changelog_file_prefix
+ self.file_suffix_include_compression = file_suffix_include_compression
+ self.file_compression = file_compression
+ self.data_file_path_directory = data_file_path_directory
+ self.external_paths = external_paths or []
+ self.index_file_in_data_file_dir = index_file_in_data_file_dir
+ self.legacy_partition_name = legacy_partition_name
+
+ def root(self) -> str:
+ return self._root
+
+ def manifest_path(self) -> str:
+ return f"{self._root}/{self.MANIFEST_PATH}"
+
+ def index_path(self) -> str:
+ return f"{self._root}/{self.INDEX_PATH}"
+
+ def statistics_path(self) -> str:
+ return f"{self._root}/{self.STATISTICS_PATH}"
+
+ def data_file_path(self) -> str:
+ if self.data_file_path_directory:
+ return f"{self._root}/{self.data_file_path_directory}"
+ return self._root
+
+ def relative_bucket_path(self, partition: Tuple, bucket: int) -> str:
+ bucket_name = str(bucket)
+ if bucket == BucketMode.POSTPONE_BUCKET.value:
+ bucket_name = "postpone"
+
+ relative_parts = [f"{self.BUCKET_PATH_PREFIX}{bucket_name}"]
+
+ # Add partition path
+ if partition:
+ partition_parts = []
+ for i, field_name in enumerate(self.partition_keys):
+ partition_parts.append(f"{field_name}={partition[i]}")
+ if partition_parts:
+ relative_parts = partition_parts + relative_parts
+
+ # Add data file path directory if specified
+ if self.data_file_path_directory:
+ relative_parts = [self.data_file_path_directory] + relative_parts
+
+ return "/".join(relative_parts)
+
+ def bucket_path(self, partition: Tuple, bucket: int) -> str:
+ relative_path = self.relative_bucket_path(partition, bucket)
+ return f"{self._root}/{relative_path}"
+
+ def create_external_path_provider(
+ self, partition: Tuple, bucket: int
+ ) -> Optional[ExternalPathProvider]:
+ if not self.external_paths:
+ return None
+
+ relative_bucket_path = self.relative_bucket_path(partition, bucket)
+ return ExternalPathProvider(self.external_paths, relative_bucket_path)
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index ba4cee9e90..014e8fbf6e 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -18,7 +18,6 @@
import time
import uuid
-from pathlib import Path
from typing import List
from pypaimon.common.core_options import CoreOptions
@@ -158,14 +157,12 @@ class FileStoreCommit:
delta_record_count -= entry.file.row_count
self.manifest_file_manager.write(new_manifest_file, commit_entries)
# TODO: implement noConflictsOrFail logic
-
partition_columns = list(zip(*(entry.partition.values for entry in
commit_entries)))
partition_min_stats = [min(col) for col in partition_columns]
partition_max_stats = [max(col) for col in partition_columns]
partition_null_counts = [sum(value == 0 for value in col) for col in
partition_columns]
if not all(count == 0 for count in partition_null_counts):
raise RuntimeError("Partition value should not be null")
-
manifest_file_path =
f"{self.manifest_file_manager.manifest_path}/{new_manifest_file}"
new_manifest_list = ManifestFileMeta(
file_name=new_manifest_file,
@@ -227,14 +224,19 @@ class FileStoreCommit:
raise RuntimeError(f"Failed to commit snapshot
{new_snapshot_id}")
def abort(self, commit_messages: List[CommitMessage]):
+ """Abort commit and delete files. Uses external_path if available to
ensure proper scheme handling."""
for message in commit_messages:
for file in message.new_files:
try:
- file_path_obj = Path(file.file_path)
- if file_path_obj.exists():
- file_path_obj.unlink()
+ path_to_delete = file.external_path if file.external_path
else file.file_path
+ if path_to_delete:
+ path_str = str(path_to_delete)
+ self.table.file_io.delete_quietly(path_str)
except Exception as e:
- print(f"Warning: Failed to clean up file {file.file_path}:
{e}")
+ import logging
+ logger = logging.getLogger(__name__)
+ path_to_delete = file.external_path if file.external_path
else file.file_path
+ logger.warning(f"Failed to clean up file {path_to_delete}
during abort: {e}")
def close(self):
"""Close the FileStoreCommit and release resources."""
diff --git a/paimon-python/pypaimon/write/writer/blob_writer.py
b/paimon-python/pypaimon/write/writer/blob_writer.py
index f22577deac..92c7e6ea1d 100644
--- a/paimon-python/pypaimon/write/writer/blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/blob_writer.py
@@ -42,7 +42,7 @@ class BlobWriter(AppendOnlyDataWriter):
self.blob_column = blob_column
options = self.table.options
- self.blob_target_file_size =
CoreOptions.get_blob_target_file_size(options)
+ self.blob_target_file_size = CoreOptions.blob_target_file_size(options)
self.current_writer: Optional[BlobFileWriter] = None
self.current_file_path: Optional[str] = None
@@ -117,7 +117,11 @@ class BlobWriter(AppendOnlyDataWriter):
file_name = self.current_file_path.split('/')[-1]
row_count = self.current_writer.row_count
- self._add_file_metadata(file_name, self.current_file_path, row_count,
file_size)
+ # Determine if this is an external path
+ is_external_path = self.external_path_provider is not None
+ external_path_str = self.current_file_path if is_external_path else
None
+
+ self._add_file_metadata(file_name, self.current_file_path, row_count,
file_size, external_path_str)
self.current_writer = None
self.current_file_path = None
@@ -143,10 +147,14 @@ class BlobWriter(AppendOnlyDataWriter):
file_size = self.file_io.get_file_size(file_path)
+ is_external_path = self.external_path_provider is not None
+ external_path_str = file_path if is_external_path else None
+
# Reuse _add_file_metadata for consistency (blob table is append-only,
no primary keys)
- self._add_file_metadata(file_name, file_path, data, file_size)
+ self._add_file_metadata(file_name, file_path, data, file_size,
external_path_str)
- def _add_file_metadata(self, file_name: str, file_path: str,
data_or_row_count, file_size: int):
+ def _add_file_metadata(self, file_name: str, file_path: str,
data_or_row_count, file_size: int,
+ external_path: Optional[str] = None):
"""Add file metadata to committed_files."""
from datetime import datetime
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
@@ -200,7 +208,7 @@ class BlobWriter(AppendOnlyDataWriter):
delete_row_count=0,
file_source=0, # FileSource.APPEND = 0
value_stats_cols=None,
- external_path=None,
+ external_path=external_path,
first_row_id=None,
write_cols=self.write_cols,
file_path=file_path,
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index b3bcc9219c..26ba1bee7b 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -258,10 +258,14 @@ class DataBlobWriter(DataWriter):
else:
raise ValueError(f"Unsupported file format: {self.file_format}")
- # Generate metadata
- return self._create_data_file_meta(file_name, file_path, data)
+ # Determine if this is an external path
+ is_external_path = self.external_path_provider is not None
+ external_path_str = file_path if is_external_path else None
- def _create_data_file_meta(self, file_name: str, file_path: str, data:
pa.Table) -> DataFileMeta:
+ return self._create_data_file_meta(file_name, file_path, data,
external_path_str)
+
+ def _create_data_file_meta(self, file_name: str, file_path: str, data:
pa.Table,
+ external_path: Optional[str] = None) ->
DataFileMeta:
# Column stats (only for normal columns)
column_stats = {
field.name: self._get_column_stats(data, field.name)
@@ -302,6 +306,7 @@ class DataBlobWriter(DataWriter):
delete_row_count=0,
file_source=0,
value_stats_cols=self.normal_column_names,
+ external_path=external_path,
file_path=file_path,
write_cols=self.write_cols)
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index fdc74d6f64..56487094ed 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -23,6 +23,7 @@ from datetime import datetime
from typing import Dict, List, Optional, Tuple
from pypaimon.common.core_options import CoreOptions
+from pypaimon.common.external_path_provider import ExternalPathProvider
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.schema.data_types import PyarrowFieldParser
@@ -46,18 +47,28 @@ class DataWriter(ABC):
self.trimmed_primary_keys = self.table.trimmed_primary_keys
options = self.table.options
- self.target_file_size = CoreOptions.get_target_file_size(options,
self.table.is_primary_key_table)
- self.file_format = options.get(CoreOptions.FILE_FORMAT,
- CoreOptions.FILE_FORMAT_PARQUET
- if self.bucket !=
BucketMode.POSTPONE_BUCKET.value
- else CoreOptions.FILE_FORMAT_AVRO)
- self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
+ self.target_file_size = CoreOptions.target_file_size(options,
self.table.is_primary_key_table)
+ # POSTPONE_BUCKET uses AVRO format, otherwise default to PARQUET
+ default_format = (
+ CoreOptions.FILE_FORMAT_AVRO
+ if self.bucket == BucketMode.POSTPONE_BUCKET.value
+ else CoreOptions.FILE_FORMAT_PARQUET
+ )
+ self.file_format = CoreOptions.file_format(options,
default=default_format)
+ self.compression = CoreOptions.file_compression(options)
self.sequence_generator = SequenceGenerator(max_seq_number)
self.pending_data: Optional[pa.Table] = None
self.committed_files: List[DataFileMeta] = []
self.write_cols = write_cols
- self.blob_as_descriptor = CoreOptions.get_blob_as_descriptor(options)
+ self.blob_as_descriptor = CoreOptions.blob_as_descriptor(options)
+
+ self.path_factory = self.table.path_factory()
+ self.external_path_provider: Optional[ExternalPathProvider] =
self.path_factory.create_external_path_provider(
+ self.partition, self.bucket
+ )
+ # Store the current generated external path to preserve scheme in
metadata
+ self._current_external_path: Optional[str] = None
def write(self, data: pa.RecordBatch):
try:
@@ -105,13 +116,17 @@ class DataWriter(ABC):
# Delete any files that were written
for file_meta in self.committed_files:
try:
- if file_meta.file_path:
- self.file_io.delete_quietly(file_meta.file_path)
+ # Use external_path if available (contains full URL scheme),
otherwise use file_path
+ path_to_delete = file_meta.external_path if
file_meta.external_path else file_meta.file_path
+ if path_to_delete:
+ path_str = str(path_to_delete)
+ self.file_io.delete_quietly(path_str)
except Exception as e:
# Log but don't raise - we want to clean up as much as possible
import logging
logger = logging.getLogger(__name__)
- logger.warning(f"Failed to delete file {file_meta.file_path}
during abort: {e}")
+ path_to_delete = file_meta.external_path if
file_meta.external_path else file_meta.file_path
+ logger.warning(f"Failed to delete file {path_to_delete} during
abort: {e}")
# Clean up resources
self.pending_data = None
@@ -145,6 +160,14 @@ class DataWriter(ABC):
return
file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
file_path = self._generate_file_path(file_name)
+
+ is_external_path = self.external_path_provider is not None
+ if is_external_path:
+ # Use the stored external path from _generate_file_path to
preserve scheme
+ external_path_str = self._current_external_path if
self._current_external_path else None
+ else:
+ external_path_str = None
+
if self.file_format == CoreOptions.FILE_FORMAT_PARQUET:
self.file_io.write_parquet(file_path, data,
compression=self.compression)
elif self.file_format == CoreOptions.FILE_FORMAT_ORC:
@@ -211,7 +234,7 @@ class DataWriter(ABC):
delete_row_count=0,
file_source=0,
value_stats_cols=None, # None means all columns in the data have
statistics
- external_path=None,
+ external_path=external_path_str, # Set external path if using
external paths
first_row_id=None,
write_cols=self.write_cols,
# None means all columns in the table have been written
@@ -219,19 +242,13 @@ class DataWriter(ABC):
))
def _generate_file_path(self, file_name: str) -> str:
- path_builder = str(self.table.table_path)
-
- for i, field_name in enumerate(self.table.partition_keys):
- path_builder =
f"{path_builder.rstrip('/')}/{field_name}={str(self.partition[i])}"
-
- if self.bucket == BucketMode.POSTPONE_BUCKET.value:
- bucket_name = "postpone"
- else:
- bucket_name = str(self.bucket)
-
- path_builder =
f"{path_builder.rstrip('/')}/bucket-{bucket_name}/{file_name}"
+ if self.external_path_provider:
+ external_path =
self.external_path_provider.get_next_external_data_path(file_name)
+ self._current_external_path = external_path
+ return external_path
- return path_builder
+ bucket_path = self.path_factory.bucket_path(self.partition,
self.bucket)
+ return f"{bucket_path.rstrip('/')}/{file_name}"
@staticmethod
def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) ->
int: