This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bc942cd3bb [python] Refactor update by row id to TableUpdate
bc942cd3bb is described below
commit bc942cd3bbb315c94716e722d0e280383ea65efe
Author: JingsongLi <[email protected]>
AuthorDate: Tue Dec 23 21:04:42 2025 +0800
[python] Refactor update by row id to TableUpdate
---
docs/content/program-api/python-api.md | 128 +++------------------
..._columns_write_test.py => table_update_test.py} | 83 ++++++-------
paimon-python/pypaimon/write/table_update.py | 46 ++++++++
...l_column_write.py => table_update_by_row_id.py} | 6 +-
paimon-python/pypaimon/write/table_write.py | 31 +----
paimon-python/pypaimon/write/write_builder.py | 18 ++-
6 files changed, 110 insertions(+), 202 deletions(-)
diff --git a/docs/content/program-api/python-api.md
b/docs/content/program-api/python-api.md
index 12c15c8f6d..c1b0615425 100644
--- a/docs/content/program-api/python-api.md
+++ b/docs/content/program-api/python-api.md
@@ -213,9 +213,12 @@ write_builder = table.new_batch_write_builder().overwrite()
write_builder = table.new_batch_write_builder().overwrite({'dt': '2024-01-01'})
```
-### Write partial columns
+### Update columns
+
+You can create `TableUpdate.update_by_arrow_with_row_id` to update columns to
data evolution tables.
-when enable data-evolution, you can write partial columns to table:
+The input data should include the `_ROW_ID` column, update operation will
automatically sort and match each `_ROW_ID` to
+its corresponding `first_row_id`, then groups rows with the same
`first_row_id` and writes them to a separate file.
```python
simple_pa_schema = pa.schema([
@@ -240,129 +243,24 @@ table_commit.commit(table_write.prepare_commit())
table_write.close()
table_commit.close()
-# write partial columns
-table_write = write_builder.new_write().with_write_type(['f0'])
-table_commit = write_builder.new_commit()
-data2 = pa.Table.from_pydict({
- 'f0': [3, 4],
-}, schema=pa.schema([
- ('f0', pa.int8()),
-]))
-table_write.write_arrow(data2)
-cmts = table_write.prepare_commit()
-
-# assign first row id
-cmts[0].new_files[0].first_row_id = 0
-table_commit.commit(cmts)
-table_write.close()
-table_commit.close()
-```
-
-Paimon data-evolution table use `first_row_id` to split files, when write
partial columns,
-you should split data into multiple parts by rows, and assign `first_row_id`
for each file before commit
-, or it may cause some fatal error during table reads.
-
-For example, in the following code, `write-1` will generate a file with
`first_row_id=0` which contains 2 rows,
-and `write-2` will generate a file with `first_row_id=2` which also contains 2
rows. Then, if we update column `f0` and
-do not split data into multiple parts by rows, the generated file would have
`first_row_id=0` and contains 4 rows, when reading
-this table, it will cause a fatal error.
-
-```python
-table = catalog.get_table('default.test_row_tracking')
-
-# write-1
-write_builder = table.new_batch_write_builder()
-table_write = write_builder.new_write()
-table_commit = write_builder.new_commit()
-expect_data = pa.Table.from_pydict({
- 'f0': [-1, 2],
- 'f1': [-1001, 1002]
-}, schema=simple_pa_schema)
-table_write.write_arrow(expect_data)
-table_commit.commit(table_write.prepare_commit())
-table_write.close()
-table_commit.close()
-
-# write-2
-table_write = write_builder.new_write()
-table_commit = write_builder.new_commit()
-expect_data = pa.Table.from_pydict({
- 'f0': [3, 4],
- 'f1': [1003, 1004]
-}, schema=simple_pa_schema)
-table_write.write_arrow(expect_data)
-table_commit.commit(table_write.prepare_commit())
-table_write.close()
-table_commit.close()
-
-# write partial columns
-table_write = write_builder.new_write().with_write_type(['f0'])
-table_commit = write_builder.new_commit()
-data2 = pa.Table.from_pydict({
- 'f0': [5, 6, 7, 8],
-}, schema=pa.schema([
- ('f0', pa.int8()),
-]))
-table_write.write_arrow(data2)
-cmts = table_write.prepare_commit()
-cmts[0].new_files[0].first_row_id = 0
-table_commit.commit(cmts)
-table_write.close()
-table_commit.close()
-
-read_builder = table.new_read_builder()
-table_scan = read_builder.new_scan()
-table_read = read_builder.new_read()
-
-# a fatal error will be thrown
-actual_data = table_read.to_arrow(table_scan.plan().splits())
-```
-
-### Update columns
-
-Handle file `first_row_id` manually is inconvenient and error-prone. If you
don't want to do this, you can enable `update_columns_by_row_id`
-when create `WriteBuilder` and set write type for `TableWrite`, then you can
write partial columns without handling file `first_row_id`.
-The input data should include the `_ROW_ID` column, writing operation will
automatically sort and match each `_ROW_ID` to
-its corresponding `first_row_id`, then groups rows with the same
`first_row_id` and writes them to a separate file.
-
-```python
-table = catalog.get_table('default.test_row_tracking')
-
-# write-1
-# same as above
-
-# write-2
-# same as above
-
# update partial columns
-write_builder = table.new_batch_write_builder().update_columns_by_row_id()
-table_write = write_builder.new_write().with_write_type(['f0'])
+write_builder = table.new_batch_write_builder()
+table_update = write_builder.new_update().with_update_type(['f0'])
table_commit = write_builder.new_commit()
data2 = pa.Table.from_pydict({
- '_ROW_ID': [0, 1, 2, 3],
- 'f0': [5, 6, 7, 8],
+ '_ROW_ID': [0, 1],
+ 'f0': [5, 6],
}, schema=pa.schema([
('_ROW_ID', pa.int64()),
('f0', pa.int8()),
]))
-table_write.write_arrow(data2)
-cmts = table_write.prepare_commit()
+cmts = table_update.update_by_arrow_with_row_id(data2)
table_commit.commit(cmts)
-table_write.close()
table_commit.close()
-read_builder = table.new_read_builder()
-table_scan = read_builder.new_scan()
-table_read = read_builder.new_read()
-actual_data = table_read.to_arrow(table_scan.plan().splits())
-expect_data = pa.Table.from_pydict({
- 'f0': [5, 6, 7, 8],
- 'f1': [-1001, 1002, 1003, 1004]
-}, schema=pa.schema([
- ('f0', pa.int8()),
- ('f1', pa.int16()),
-]))
-self.assertEqual(actual_data, expect_data)
+# content should be:
+# 'f0': [5, 6],
+# 'f1': [-1001, 1002]
```
## Batch Read
diff --git a/paimon-python/pypaimon/tests/partial_columns_write_test.py
b/paimon-python/pypaimon/tests/table_update_test.py
similarity index 80%
rename from paimon-python/pypaimon/tests/partial_columns_write_test.py
rename to paimon-python/pypaimon/tests/table_update_test.py
index 6f3b0b75f6..a3fd3022e2 100644
--- a/paimon-python/pypaimon/tests/partial_columns_write_test.py
+++ b/paimon-python/pypaimon/tests/table_update_test.py
@@ -25,7 +25,7 @@ import pyarrow as pa
from pypaimon import CatalogFactory, Schema
-class PartialColumnsWriteTest(unittest.TestCase):
+class TableUpdateTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.tempdir = tempfile.mkdtemp()
@@ -101,7 +101,7 @@ class PartialColumnsWriteTest(unittest.TestCase):
# Create table with initial data
table = self._create_table()
- # Create data evolution writer using BatchTableWrite
+ # Create data evolution table update
write_builder = table.new_batch_write_builder()
batch_write = write_builder.new_write()
@@ -112,10 +112,9 @@ class PartialColumnsWriteTest(unittest.TestCase):
})
# Update the age column
- write_builder =
table.new_batch_write_builder().update_columns_by_row_id()
- batch_write = write_builder.new_write().with_write_type(['age'])
- batch_write.write_arrow(update_data)
- commit_messages = batch_write.prepare_commit()
+ write_builder = table.new_batch_write_builder()
+ table_update = write_builder.new_update().with_update_type(['age'])
+ commit_messages = table_update.update_by_arrow_with_row_id(update_data)
# Commit the changes
table_commit = write_builder.new_commit()
@@ -139,7 +138,7 @@ class PartialColumnsWriteTest(unittest.TestCase):
# Create table with initial data
table = self._create_table()
- # Create data evolution writer using BatchTableWrite
+ # Create data evolution table update
write_builder = table.new_batch_write_builder()
batch_write = write_builder.new_write()
@@ -151,10 +150,9 @@ class PartialColumnsWriteTest(unittest.TestCase):
})
# Update multiple columns
- write_builder =
table.new_batch_write_builder().update_columns_by_row_id()
- batch_write = write_builder.new_write().with_write_type(['age',
'city'])
- batch_write.write_arrow(update_data)
- commit_messages = batch_write.prepare_commit()
+ write_builder = table.new_batch_write_builder()
+ table_update = write_builder.new_update().with_update_type(['age',
'city'])
+ commit_messages = table_update.update_by_arrow_with_row_id(update_data)
# Commit the changes
table_commit = write_builder.new_commit()
@@ -182,10 +180,6 @@ class PartialColumnsWriteTest(unittest.TestCase):
"""Test that updating a non-existent column raises an error."""
table = self._create_table()
- # Create data evolution writer using BatchTableWrite
- write_builder = table.new_batch_write_builder()
- batch_write = write_builder.new_write()
-
# Try to update a non-existent column
update_data = pa.Table.from_pydict({
'_ROW_ID': [0, 1, 2, 3, 4],
@@ -194,18 +188,17 @@ class PartialColumnsWriteTest(unittest.TestCase):
# Should raise ValueError
with self.assertRaises(ValueError) as context:
- write_builder =
table.new_batch_write_builder().update_columns_by_row_id()
- batch_write =
write_builder.new_write().with_write_type(['nonexistent_column'])
- batch_write.write_arrow(update_data)
+ write_builder = table.new_batch_write_builder()
+ table_update =
write_builder.new_update().with_update_type(['nonexistent_column'])
+ table_update.update_by_arrow_with_row_id(update_data)
self.assertIn('not in table schema', str(context.exception))
- batch_write.close()
def test_missing_row_id_column(self):
"""Test that missing row_id column raises an error."""
table = self._create_table()
- # Create data evolution writer using BatchTableWrite
+ # Create data evolution table update
write_builder = table.new_batch_write_builder()
batch_write = write_builder.new_write()
@@ -216,9 +209,9 @@ class PartialColumnsWriteTest(unittest.TestCase):
# Should raise ValueError
with self.assertRaises(ValueError) as context:
- write_builder =
table.new_batch_write_builder().update_columns_by_row_id()
- batch_write = write_builder.new_write().with_write_type(['age'])
- batch_write.write_arrow(update_data)
+ write_builder = table.new_batch_write_builder()
+ table_update = write_builder.new_update().with_update_type(['age'])
+ table_update.update_by_arrow_with_row_id(update_data)
self.assertIn("Input data must contain _ROW_ID column",
str(context.exception))
batch_write.close()
@@ -247,9 +240,9 @@ class PartialColumnsWriteTest(unittest.TestCase):
table_write.close()
table_commit.close()
- # Create data evolution writer using BatchTableWrite
- write_builder =
table.new_batch_write_builder().update_columns_by_row_id()
- batch_write = write_builder.new_write().with_write_type(['age'])
+ # Create data evolution table update
+ write_builder = table.new_batch_write_builder()
+ table_update = write_builder.new_update().with_update_type(['age'])
# Update ages
update_data = pa.Table.from_pydict({
@@ -257,14 +250,12 @@ class PartialColumnsWriteTest(unittest.TestCase):
'age': [31, 26, 36, 41, 46]
})
- batch_write.write_arrow(update_data)
- commit_messages = batch_write.prepare_commit()
+ commit_messages = table_update.update_by_arrow_with_row_id(update_data)
# Commit the changes
table_commit = write_builder.new_commit()
table_commit.commit(commit_messages)
table_commit.close()
- batch_write.close()
# Verify the updated data
read_builder = table.new_read_builder()
@@ -283,16 +274,15 @@ class PartialColumnsWriteTest(unittest.TestCase):
table = self._create_table()
# First update: Update age column
- write_builder =
table.new_batch_write_builder().update_columns_by_row_id()
- batch_write = write_builder.new_write().with_write_type(['age'])
+ write_builder = table.new_batch_write_builder()
+ table_update = write_builder.new_update().with_update_type(['age'])
update_age_data = pa.Table.from_pydict({
'_ROW_ID': [1, 0, 2, 3, 4],
'age': [31, 26, 36, 41, 46]
})
- batch_write.write_arrow(update_age_data)
- commit_messages = batch_write.prepare_commit()
+ commit_messages =
table_update.update_by_arrow_with_row_id(update_age_data)
table_commit = write_builder.new_commit()
table_commit.commit(commit_messages)
table_commit.close()
@@ -302,17 +292,12 @@ class PartialColumnsWriteTest(unittest.TestCase):
'_ROW_ID': [1, 0, 2, 3, 4],
'city': ['Los Angeles', 'New York', 'Chicago', 'Phoenix',
'Houston']
})
- write_builder =
table.new_batch_write_builder().update_columns_by_row_id()
- batch_write = write_builder.new_write().with_write_type(['city'])
- batch_write.write_arrow(update_city_data)
- commit_messages = batch_write.prepare_commit()
+ table_update.with_update_type(['city'])
+ commit_messages =
table_update.update_by_arrow_with_row_id(update_city_data)
table_commit = write_builder.new_commit()
table_commit.commit(commit_messages)
table_commit.close()
- # Close the batch write
- batch_write.close()
-
# Verify both columns were updated correctly
read_builder = table.new_read_builder()
table_read = read_builder.new_read()
@@ -333,9 +318,9 @@ class PartialColumnsWriteTest(unittest.TestCase):
# Create table with initial data
table = self._create_table()
- # Create data evolution writer using BatchTableWrite
- write_builder =
table.new_batch_write_builder().update_columns_by_row_id()
- batch_write = write_builder.new_write().with_write_type(['age'])
+ # Create data evolution table update
+ write_builder = table.new_batch_write_builder()
+ table_update = write_builder.new_update().with_update_type(['age'])
# Prepare update data with wrong row count (only 3 rows instead of 5)
update_data = pa.Table.from_pydict({
@@ -345,19 +330,18 @@ class PartialColumnsWriteTest(unittest.TestCase):
# Should raise ValueError for total row count mismatch
with self.assertRaises(ValueError) as context:
- batch_write.write_arrow(update_data)
+ table_update.update_by_arrow_with_row_id(update_data)
self.assertIn("does not match table total row count",
str(context.exception))
- batch_write.close()
def test_wrong_first_row_id_row_count(self):
"""Test that wrong row count for a first_row_id raises an error."""
# Create table with initial data
table = self._create_table()
- # Create data evolution writer using BatchTableWrite
- write_builder =
table.new_batch_write_builder().update_columns_by_row_id()
- batch_write = write_builder.new_write().with_write_type(['age'])
+ # Create data evolution table update
+ write_builder = table.new_batch_write_builder()
+ table_update = write_builder.new_update().with_update_type(['age'])
# Prepare update data with duplicate row_id (violates monotonically
increasing)
update_data = pa.Table.from_pydict({
@@ -367,10 +351,9 @@ class PartialColumnsWriteTest(unittest.TestCase):
# Should raise ValueError for row ID validation
with self.assertRaises(ValueError) as context:
- batch_write.write_arrow(update_data)
+ table_update.update_by_arrow_with_row_id(update_data)
self.assertIn("Row IDs are not monotonically increasing",
str(context.exception))
- batch_write.close()
if __name__ == '__main__':
unittest.main()
diff --git a/paimon-python/pypaimon/write/table_update.py
b/paimon-python/pypaimon/write/table_update.py
new file mode 100644
index 0000000000..baaf3f18e3
--- /dev/null
+++ b/paimon-python/pypaimon/write/table_update.py
@@ -0,0 +1,46 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+from typing import List
+
+import pyarrow as pa
+
+from pypaimon.write.commit_message import CommitMessage
+from pypaimon.write.table_update_by_row_id import TableUpdateByRowId
+
+
+class TableUpdate:
+ def __init__(self, table, commit_user):
+ from pypaimon.table.file_store_table import FileStoreTable
+
+ self.table: FileStoreTable = table
+ self.commit_user = commit_user
+ self.update_cols = None
+
+ def with_update_type(self, update_cols: List[str]):
+ for col in update_cols:
+ if col not in self.table.field_names:
+ raise ValueError(f"Column {col} is not in table schema.")
+ if len(update_cols) == len(self.table.field_names):
+ update_cols = None
+ self.update_cols = update_cols
+ return self
+
+ def update_by_arrow_with_row_id(self, table: pa.Table) ->
List[CommitMessage]:
+ update_by_row_id = TableUpdateByRowId(self.table, self.commit_user)
+ update_by_row_id.update_columns(table, self.update_cols)
+ return update_by_row_id.commit_messages
diff --git a/paimon-python/pypaimon/write/partial_column_write.py
b/paimon-python/pypaimon/write/table_update_by_row_id.py
similarity index 98%
rename from paimon-python/pypaimon/write/partial_column_write.py
rename to paimon-python/pypaimon/write/table_update_by_row_id.py
index c60464f306..fb854dbccc 100644
--- a/paimon-python/pypaimon/write/partial_column_write.py
+++ b/paimon-python/pypaimon/write/table_update_by_row_id.py
@@ -27,11 +27,11 @@ from pypaimon.table.special_fields import SpecialFields
from pypaimon.write.file_store_write import FileStoreWrite
-class PartialColumnWrite:
+class TableUpdateByRowId:
"""
- Table write for partial column updates (data evolution).
+ Table update for partial column updates (data evolution).
- This writer is designed for adding/updating specific columns in existing
tables.
+ This update is designed for adding/updating specific columns in existing
tables.
Input data should contain _ROW_ID column.
"""
diff --git a/paimon-python/pypaimon/write/table_write.py
b/paimon-python/pypaimon/write/table_write.py
index 8bc2c023ee..0ac73356a3 100644
--- a/paimon-python/pypaimon/write/table_write.py
+++ b/paimon-python/pypaimon/write/table_write.py
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
from collections import defaultdict
-from typing import List, Optional
+from typing import List
import pyarrow as pa
@@ -24,7 +24,6 @@ from pypaimon.schema.data_types import PyarrowFieldParser
from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
from pypaimon.write.commit_message import CommitMessage
from pypaimon.write.file_store_write import FileStoreWrite
-from pypaimon.write.partial_column_write import PartialColumnWrite
class TableWrite:
@@ -81,39 +80,15 @@ class TableWrite:
class BatchTableWrite(TableWrite):
- def __init__(self, table, commit_user, update_columns_by_row_id=False):
+ def __init__(self, table, commit_user):
super().__init__(table, commit_user)
self.batch_committed = False
- self._partial_column_write: Optional[PartialColumnWrite] = None
- if update_columns_by_row_id:
- self._partial_column_write = PartialColumnWrite(self.table,
self.commit_user)
-
- def write_arrow(self, table: pa.Table):
- if self._partial_column_write is not None:
- return self._partial_column_write.update_columns(table,
self.file_store_write.write_cols)
- super().write_arrow(table)
-
- def write_arrow_batch(self, data: pa.RecordBatch):
- if self._partial_column_write is not None:
- table = pa.Table.from_batches([data])
- return self._partial_column_write.update_columns(table,
self.file_store_write.write_cols)
- super().write_arrow_batch(data)
-
- def write_pandas(self, dataframe):
- if self._partial_column_write is not None:
- table = pa.Table.from_pandas(dataframe)
- return self._partial_column_write.update_columns(table,
self.file_store_write.write_cols)
- super().write_pandas(dataframe)
def prepare_commit(self) -> List[CommitMessage]:
if self.batch_committed:
raise RuntimeError("BatchTableWrite only supports one-time
committing.")
self.batch_committed = True
-
- if self._partial_column_write is not None:
- return self._partial_column_write.commit_messages
- else:
- return
self.file_store_write.prepare_commit(BATCH_COMMIT_IDENTIFIER)
+ return self.file_store_write.prepare_commit(BATCH_COMMIT_IDENTIFIER)
class StreamTableWrite(TableWrite):
diff --git a/paimon-python/pypaimon/write/write_builder.py
b/paimon-python/pypaimon/write/write_builder.py
index c83c11e746..f2a90af0bf 100644
--- a/paimon-python/pypaimon/write/write_builder.py
+++ b/paimon-python/pypaimon/write/write_builder.py
@@ -22,6 +22,7 @@ from typing import Optional
from pypaimon.write.table_commit import (BatchTableCommit, StreamTableCommit,
TableCommit)
+from pypaimon.write.table_update import TableUpdate
from pypaimon.write.table_write import (BatchTableWrite, StreamTableWrite,
TableWrite)
@@ -33,7 +34,6 @@ class WriteBuilder(ABC):
self.table: FileStoreTable = table
self.commit_user = self._create_commit_user()
self.static_partition = None
- self._update_columns_by_row_id = False
def overwrite(self, static_partition: Optional[dict] = None):
self.static_partition = static_partition if static_partition is not
None else {}
@@ -42,6 +42,9 @@ class WriteBuilder(ABC):
def new_write(self) -> TableWrite:
"""Returns a table write."""
+ def new_update(self) -> TableUpdate:
+ """Returns a table update."""
+
def new_commit(self) -> TableCommit:
"""Returns a table commit."""
@@ -52,15 +55,14 @@ class WriteBuilder(ABC):
else:
return str(uuid.uuid4())
- def update_columns_by_row_id(self):
- self._update_columns_by_row_id = True
- return self
-
class BatchWriteBuilder(WriteBuilder):
def new_write(self) -> BatchTableWrite:
- return BatchTableWrite(self.table, self.commit_user,
self._update_columns_by_row_id)
+ return BatchTableWrite(self.table, self.commit_user)
+
+ def new_update(self) -> TableUpdate:
+ return TableUpdate(self.table, self.commit_user)
def new_commit(self) -> BatchTableCommit:
commit = BatchTableCommit(self.table, self.commit_user,
self.static_partition)
@@ -68,9 +70,13 @@ class BatchWriteBuilder(WriteBuilder):
class StreamWriteBuilder(WriteBuilder):
+
def new_write(self) -> StreamTableWrite:
return StreamTableWrite(self.table, self.commit_user)
+ def new_update(self) -> TableUpdate:
+ raise ValueError("StreamWriteBuilder.new_update() not supported.")
+
def new_commit(self) -> StreamTableCommit:
commit = StreamTableCommit(self.table, self.commit_user,
self.static_partition)
return commit