This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b6c305815c [python] Introduce postpone bucket table (#6074)
b6c305815c is described below
commit b6c305815cfdb11024c637544e182e2407621ec4
Author: umi <[email protected]>
AuthorDate: Thu Aug 14 11:07:53 2025 +0800
[python] Introduce postpone bucket table (#6074)
---
.../pypaimon/manifest/manifest_file_manager.py | 2 +-
paimon-python/pypaimon/read/table_scan.py | 25 +++++++----
paimon-python/pypaimon/schema/schema.py | 4 +-
paimon-python/pypaimon/table/bucket_mode.py | 3 ++
paimon-python/pypaimon/table/file_store_table.py | 7 +++-
paimon-python/pypaimon/tests/rest_table_test.py | 48 ++++++++++++++++++++++
paimon-python/pypaimon/tests/writer_test.py | 2 +-
paimon-python/pypaimon/write/row_key_extractor.py | 14 +++++++
paimon-python/pypaimon/write/writer/data_writer.py | 12 +++++-
9 files changed, 101 insertions(+), 16 deletions(-)
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index 8f7c8e325c..ef674e5b88 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -73,7 +73,7 @@ class ManifestFileManager:
total_buckets=record['_TOTAL_BUCKETS'],
file=file_meta
)
- if shard_filter and not shard_filter(entry):
+ if not shard_filter(entry):
continue
entries.append(entry)
return entries
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index fa4ade3a5f..2745b1d1b3 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -29,6 +29,7 @@ from pypaimon.read.plan import Plan
from pypaimon.read.split import Split
from pypaimon.schema.data_types import DataField
from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.bucket_mode import BucketMode
from pypaimon.write.row_key_extractor import FixedBucketRowKeyExtractor
@@ -55,6 +56,9 @@ class TableScan:
self.idx_of_this_subtask = None
self.number_of_para_subtasks = None
+ self.only_read_real_buckets = True if self.table.options.get('bucket',
+ -1) ==
BucketMode.POSTPONE_BUCKET.value else False
+
def plan(self) -> Plan:
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
if not latest_snapshot:
@@ -64,8 +68,7 @@ class TableScan:
file_entries = []
for manifest_file_path in manifest_files:
manifest_entries =
self.manifest_file_manager.read(manifest_file_path,
- (lambda row:
self._shard_filter(row))
- if
self.idx_of_this_subtask is not None else None)
+ lambda row:
self._bucket_filter(row))
for entry in manifest_entries:
if entry.kind == 0:
file_entries.append(entry)
@@ -93,13 +96,17 @@ class TableScan:
self.number_of_para_subtasks = number_of_para_subtasks
return self
- def _shard_filter(self, entry: Optional[ManifestEntry]) -> bool:
- if self.table.is_primary_key_table:
- bucket = entry.bucket
- return bucket % self.number_of_para_subtasks ==
self.idx_of_this_subtask
- else:
- file = entry.file.file_name
- return FixedBucketRowKeyExtractor.hash(file) %
self.number_of_para_subtasks == self.idx_of_this_subtask
+ def _bucket_filter(self, entry: Optional[ManifestEntry]) -> bool:
+ bucket = entry.bucket
+ if self.only_read_real_buckets and bucket < 0:
+ return False
+ if self.idx_of_this_subtask is not None:
+ if self.table.is_primary_key_table:
+ return bucket % self.number_of_para_subtasks ==
self.idx_of_this_subtask
+ else:
+ file = entry.file.file_name
+ return FixedBucketRowKeyExtractor.hash(file) %
self.number_of_para_subtasks == self.idx_of_this_subtask
+ return True
def _apply_push_down_limit(self, splits: List[Split]) -> List[Split]:
if self.limit is None:
diff --git a/paimon-python/pypaimon/schema/schema.py
b/paimon-python/pypaimon/schema/schema.py
index 6c3363a586..20e0720087 100644
--- a/paimon-python/pypaimon/schema/schema.py
+++ b/paimon-python/pypaimon/schema/schema.py
@@ -40,7 +40,7 @@ class Schema:
def __init__(self, fields: Optional[List[DataField]] = None,
partition_keys: Optional[List[str]] = None,
primary_keys: Optional[List[str]] = None,
- options: Optional[Dict[str, str]] = None, comment:
Optional[str] = None):
+ options: Optional[Dict] = None, comment: Optional[str] =
None):
self.fields = fields if fields is not None else []
self.partition_keys = partition_keys if partition_keys is not None
else []
self.primary_keys = primary_keys if primary_keys is not None else []
@@ -49,6 +49,6 @@ class Schema:
@staticmethod
def from_pyarrow_schema(pa_schema: pa.Schema, partition_keys:
Optional[List[str]] = None,
- primary_keys: Optional[List[str]] = None, options:
Optional[Dict[str, str]] = None,
+ primary_keys: Optional[List[str]] = None, options:
Optional[Dict] = None,
comment: Optional[str] = None):
return Schema(PyarrowFieldParser.to_paimon_schema(pa_schema),
partition_keys, primary_keys, options, comment)
diff --git a/paimon-python/pypaimon/table/bucket_mode.py
b/paimon-python/pypaimon/table/bucket_mode.py
index df6c8e949d..22e79bf3c3 100644
--- a/paimon-python/pypaimon/table/bucket_mode.py
+++ b/paimon-python/pypaimon/table/bucket_mode.py
@@ -27,3 +27,6 @@ class BucketMode(Enum):
HASH_DYNAMIC = auto()
CROSS_PARTITION = auto()
BUCKET_UNAWARE = auto()
+ POSTPONE_MODE = auto()
+
+ POSTPONE_BUCKET = -2
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 1ceb007fae..4fcef0473d 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -29,6 +29,7 @@ from pypaimon.table.table import Table
from pypaimon.write.batch_write_builder import BatchWriteBuilder
from pypaimon.write.row_key_extractor import (DynamicBucketRowKeyExtractor,
FixedBucketRowKeyExtractor,
+ PostponeBucketRowKeyExtractor,
RowKeyExtractor,
UnawareBucketRowKeyExtractor)
@@ -52,7 +53,9 @@ class FileStoreTable(Table):
def bucket_mode(self) -> BucketMode:
if self.is_primary_key_table:
- if self.options.get(CoreOptions.BUCKET, -1) == -1:
+ if self.options.get(CoreOptions.BUCKET, -1) == -2:
+ return BucketMode.POSTPONE_MODE
+ elif self.options.get(CoreOptions.BUCKET, -1) == -1:
if self.cross_partition_update:
return BucketMode.CROSS_PARTITION
else:
@@ -77,6 +80,8 @@ class FileStoreTable(Table):
return FixedBucketRowKeyExtractor(self.table_schema)
elif bucket_mode == BucketMode.BUCKET_UNAWARE:
return UnawareBucketRowKeyExtractor(self.table_schema)
+ elif bucket_mode == BucketMode.POSTPONE_MODE:
+ return PostponeBucketRowKeyExtractor(self.table_schema)
elif bucket_mode == BucketMode.HASH_DYNAMIC or bucket_mode ==
BucketMode.CROSS_PARTITION:
return DynamicBucketRowKeyExtractor(self.table_schema)
else:
diff --git a/paimon-python/pypaimon/tests/rest_table_test.py
b/paimon-python/pypaimon/tests/rest_table_test.py
index a1feed8d50..9c64c17003 100644
--- a/paimon-python/pypaimon/tests/rest_table_test.py
+++ b/paimon-python/pypaimon/tests/rest_table_test.py
@@ -15,6 +15,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
+import glob
+import os
import pyarrow as pa
@@ -145,3 +147,49 @@ class RESTTableTest(RESTCatalogBaseTest):
}
expected = pa.Table.from_pydict(data_expected, schema=self.pa_schema)
self.assertEqual(actual, expected)
+
+ def test_postpone_write(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
+ options={'bucket': -2})
+ self.rest_catalog.create_table('default.test_postpone', schema, False)
+ table = self.rest_catalog.get_table('default.test_postpone')
+
+ expect = pa.Table.from_pydict(self.data, schema=self.pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(expect)
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ self.assertTrue(os.path.exists(self.warehouse +
"/default/test_postpone/snapshot/LATEST"))
+ self.assertTrue(os.path.exists(self.warehouse +
"/default/test_postpone/snapshot/snapshot-1"))
+ self.assertTrue(os.path.exists(self.warehouse +
"/default/test_postpone/manifest"))
+ self.assertEqual(len(glob.glob(self.warehouse +
"/default/test_postpone/manifest/*.avro")), 2)
+ self.assertEqual(len(glob.glob(self.warehouse +
"/default/test_postpone/user_id=2/bucket-postpone/*.avro")), 1)
+
+ def test_postpone_read_write(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['user_id'], primary_keys=['user_id', 'dt'],
+ options={'bucket': -2})
+ self.rest_catalog.create_table('default.test_postpone', schema, False)
+ table = self.rest_catalog.get_table('default.test_postpone')
+
+ expect = pa.Table.from_pydict(self.data, schema=self.pa_schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ table_write.write_arrow(expect)
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ actual = table_read.to_arrow(splits)
+ self.assertTrue(not actual)
diff --git a/paimon-python/pypaimon/tests/writer_test.py
b/paimon-python/pypaimon/tests/writer_test.py
index 1d1cec7382..f1f6bb526b 100644
--- a/paimon-python/pypaimon/tests/writer_test.py
+++ b/paimon-python/pypaimon/tests/writer_test.py
@@ -27,7 +27,7 @@ from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.schema.schema import Schema
-class WriterTestCase(unittest.TestCase):
+class WriterTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
diff --git a/paimon-python/pypaimon/write/row_key_extractor.py
b/paimon-python/pypaimon/write/row_key_extractor.py
index 801cc5fff4..bec8e08fb7 100644
--- a/paimon-python/pypaimon/write/row_key_extractor.py
+++ b/paimon-python/pypaimon/write/row_key_extractor.py
@@ -24,6 +24,7 @@ import pyarrow as pa
from pypaimon.common.core_options import CoreOptions
from pypaimon.schema.table_schema import TableSchema
+from pypaimon.table.bucket_mode import BucketMode
class RowKeyExtractor(ABC):
@@ -125,3 +126,16 @@ class DynamicBucketRowKeyExtractor(RowKeyExtractor):
def _extract_buckets_batch(self, data: pa.RecordBatch) -> int:
raise ValueError("Can't extract bucket from row in dynamic bucket
mode")
+
+
+class PostponeBucketRowKeyExtractor(RowKeyExtractor):
+ """Extractor for unaware bucket mode (bucket = -1, no primary keys)."""
+
+ def __init__(self, table_schema: TableSchema):
+ super().__init__(table_schema)
+ num_buckets = table_schema.options.get(CoreOptions.BUCKET, -2)
+ if num_buckets != BucketMode.POSTPONE_BUCKET.value:
+ raise ValueError(f"Postpone bucket mode requires bucket = -2, got
{num_buckets}")
+
+ def _extract_buckets_batch(self, data: pa.RecordBatch) -> List[int]:
+ return [BucketMode.POSTPONE_BUCKET.value] * data.num_rows
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index c84cd5d39a..c11d991b84 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -25,6 +25,7 @@ import pyarrow as pa
from pypaimon.common.core_options import CoreOptions
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.table.bucket_mode import BucketMode
from pypaimon.table.row.binary_row import BinaryRow
@@ -44,7 +45,10 @@ class DataWriter(ABC):
options = self.table.options
self.target_file_size = 256 * 1024 * 1024
- self.file_format = options.get(CoreOptions.FILE_FORMAT,
CoreOptions.FILE_FORMAT_PARQUET)
+ self.file_format = options.get(CoreOptions.FILE_FORMAT,
+ CoreOptions.FILE_FORMAT_PARQUET
+ if self.bucket !=
BucketMode.POSTPONE_BUCKET.value
+ else CoreOptions.FILE_FORMAT_AVRO)
self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
self.pending_data: Optional[pa.RecordBatch] = None
@@ -134,7 +138,11 @@ class DataWriter(ABC):
for i, field_name in enumerate(self.table.partition_keys):
path_builder = path_builder / (field_name + "=" +
str(self.partition[i]))
- path_builder = path_builder / ("bucket-" + str(self.bucket)) /
file_name
+ if self.bucket == BucketMode.POSTPONE_BUCKET.value:
+ bucket_name = "postpone"
+ else:
+ bucket_name = str(self.bucket)
+ path_builder = path_builder / ("bucket-" + bucket_name) / file_name
return path_builder