This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new bee7289d3f [Python] Add basic tests for schema evolution read (#6463)
bee7289d3f is described below
commit bee7289d3fc1658635e23b87302c2847b0da62df
Author: umi <[email protected]>
AuthorDate: Thu Oct 23 17:39:15 2025 +0800
[Python] Add basic tests for schema evolution read (#6463)
---
.../pypaimon/tests/schema_evolution_read_test.py | 191 ++++++++++++++++-----
1 file changed, 146 insertions(+), 45 deletions(-)
diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
index 046f84487d..2ff4b09e53 100644
--- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
+++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
@@ -21,6 +21,7 @@ import shutil
import tempfile
import unittest
+import pandas
import pyarrow as pa
from pypaimon import CatalogFactory, Schema
@@ -127,22 +128,23 @@ class SchemaEvolutionReadTest(unittest.TestCase):
}, schema=pa_schema)
self.assertEqual(expected, actual)
- def test_schema_evolution_with_read_filter(self):
+ def test_schema_evolution_type(self):
# schema 0
pa_schema = pa.schema([
('user_id', pa.int64()),
- ('item_id', pa.int64()),
+ ('time', pa.timestamp('s')),
('dt', pa.string())
])
schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
- self.catalog.create_table('default.test_schema_evolution_with_filter',
schema, False)
- table1 =
self.catalog.get_table('default.test_schema_evolution_with_filter')
+ self.catalog.create_table('default.schema_evolution_type', schema,
False)
+ table1 = self.catalog.get_table('default.schema_evolution_type')
write_builder = table1.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data1 = {
'user_id': [1, 2, 3, 4],
- 'item_id': [1001, 1002, 1003, 1004],
+ 'time': [pandas.Timestamp("2025-01-01 00:00:00"),
pandas.Timestamp("2025-01-02 00:02:00"),
+ pandas.Timestamp("2025-01-03 00:03:00"),
pandas.Timestamp("2025-01-04 00:04:00")],
'dt': ['p1', 'p1', 'p2', 'p1'],
}
pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
@@ -153,21 +155,22 @@ class SchemaEvolutionReadTest(unittest.TestCase):
# schema 1 add behavior column
pa_schema = pa.schema([
- ('user_id', pa.int64()),
- ('item_id', pa.int64()),
+ ('user_id', pa.int8()),
+ ('time', pa.timestamp('ms')),
('dt', pa.string()),
('behavior', pa.string())
])
schema2 = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
-
self.catalog.create_table('default.test_schema_evolution_with_filter2',
schema2, False)
- table2 =
self.catalog.get_table('default.test_schema_evolution_with_filter2')
+ self.catalog.create_table('default.schema_evolution_type2', schema2,
False)
+ table2 = self.catalog.get_table('default.schema_evolution_type2')
table2.table_schema.id = 1
write_builder = table2.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
data2 = {
'user_id': [5, 6, 7, 8],
- 'item_id': [1005, 1006, 1007, 1008],
+ 'time': [pandas.Timestamp("2025-01-05 00:05:00"),
pandas.Timestamp("2025-01-06 00:06:00"),
+ pandas.Timestamp("2025-01-07 00:07:00"),
pandas.Timestamp("2025-01-08 00:08:00")],
'dt': ['p2', 'p1', 'p2', 'p2'],
'behavior': ['e', 'f', 'g', 'h'],
}
@@ -181,49 +184,22 @@ class SchemaEvolutionReadTest(unittest.TestCase):
schema_manager = SchemaManager(table2.file_io, table2.table_path)
schema_manager.commit(TableSchema.from_schema(schema_id=0,
schema=schema))
schema_manager.commit(TableSchema.from_schema(schema_id=1,
schema=schema2))
- # behavior filter
- splits = self._scan_table(table1.new_read_builder())
-
- read_builder = table2.new_read_builder()
- predicate_builder = read_builder.new_predicate_builder()
- predicate = predicate_builder.not_equal('behavior', "g")
- splits2 = self._scan_table(read_builder.with_filter(predicate))
- for split in splits2:
- for file in split.files:
- file.schema_id = 1
- splits.extend(splits2)
- table_read = read_builder.new_read()
- actual = table_read.to_arrow(splits)
- # 'behavior' is not included in the file. In order to filter more
conservatively, we choose to discard the
- # filtering criteria for 'behavior'
- expected = pa.Table.from_pydict({
- 'user_id': [1, 2, 4, 3, 5, 8, 6],
- 'item_id': [1001, 1002, 1004, 1003, 1005, 1008, 1006],
- 'dt': ["p1", "p1", "p1", "p2", "p2", "p2", "p1"],
- 'behavior': [None, None, None, None, "e", "h", "f"],
- }, schema=pa_schema)
- self.assertEqual(expected, actual)
- # user_id filter
splits = self._scan_table(table1.new_read_builder())
-
read_builder = table2.new_read_builder()
- predicate_builder = read_builder.new_predicate_builder()
- predicate = predicate_builder.less_than('user_id', 6)
- splits2 = self._scan_table(read_builder.with_filter(predicate))
- self.assertEqual(1, len(splits2))
- for split in splits2:
- for file in split.files:
- file.schema_id = 1
+ splits2 = self._scan_table(read_builder)
splits.extend(splits2)
table_read = read_builder.new_read()
actual = table_read.to_arrow(splits)
expected = pa.Table.from_pydict({
- 'user_id': [1, 2, 4, 3, 5],
- 'item_id': [1001, 1002, 1004, 1003, 1005],
- 'dt': ["p1", "p1", "p1", "p2", "p2"],
- 'behavior': [None, None, None, None, "e"],
+ 'user_id': [1, 2, 4, 3, 5, 7, 8, 6],
+ 'time': [pandas.Timestamp("2025-01-01 00:00:00"),
pandas.Timestamp("2025-01-02 00:02:00"),
+ pandas.Timestamp("2025-01-04 00:04:00"),
pandas.Timestamp("2025-01-03 00:03:00"),
+ pandas.Timestamp("2025-01-05 00:05:00"),
pandas.Timestamp("2025-01-07 00:07:00"),
+ pandas.Timestamp("2025-01-08 00:08:00"),
pandas.Timestamp("2025-01-06 00:06:00"), ],
+ 'dt': ["p1", "p1", "p1", "p2", "p2", "p2", "p2", "p1"],
+ 'behavior': [None, None, None, None, "e", "g", "h", "f"],
}, schema=pa_schema)
self.assertEqual(expected, actual)
@@ -292,6 +268,131 @@ class SchemaEvolutionReadTest(unittest.TestCase):
entries =
new_scan.starting_scanner.read_manifest_entries(manifest_files)
self.assertEqual(1, len(entries)) # verify scan filter success for
schema evolution
+ def test_schema_evolution_with_read_filter(self):
+ # schema 0
+ pa_schema = pa.schema([
+ ('user_id', pa.int64()),
+ ('item_id', pa.int64()),
+ ('dt', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+ self.catalog.create_table('default.test_schema_evolution_with_filter',
schema, False)
+ table1 =
self.catalog.get_table('default.test_schema_evolution_with_filter')
+ write_builder = table1.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data1 = {
+ 'user_id': [1, 2, 3, 4],
+ 'item_id': [1001, 1002, 1003, 1004],
+ 'dt': ['p1', 'p1', 'p2', 'p1'],
+ }
+ pa_table = pa.Table.from_pydict(data1, schema=pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # schema 1 add behavior column
+ pa_schema = pa.schema([
+ ('user_id', pa.int64()),
+ ('item_id', pa.int64()),
+ ('dt', pa.string()),
+ ('behavior', pa.string())
+ ])
+ schema2 = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+
self.catalog.create_table('default.test_schema_evolution_with_filter2',
schema2, False)
+ table2 =
self.catalog.get_table('default.test_schema_evolution_with_filter2')
+ table2.table_schema.id = 1
+ write_builder = table2.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data2 = {
+ 'user_id': [5, 6, 7, 8],
+ 'item_id': [1005, 1006, 1007, 1008],
+ 'dt': ['p2', 'p1', 'p2', 'p2'],
+ 'behavior': ['e', 'f', 'g', 'h'],
+ }
+ pa_table = pa.Table.from_pydict(data2, schema=pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ # write schema-0 and schema-1 to table2
+ schema_manager = SchemaManager(table2.file_io, table2.table_path)
+ schema_manager.commit(TableSchema.from_schema(schema_id=0,
schema=schema))
+ schema_manager.commit(TableSchema.from_schema(schema_id=1,
schema=schema2))
+
+ # behavior or user_id filter
+ splits = self._scan_table(table1.new_read_builder())
+ read_builder = table2.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ ne_predicate = predicate_builder.equal('behavior', "g")
+ lt_predicate = predicate_builder.less_than('user_id', 6)
+ and_predicate = predicate_builder.or_predicates([ne_predicate,
lt_predicate])
+ splits2 = self._scan_table(read_builder.with_filter(and_predicate))
+ for split in splits2:
+ for file in split.files:
+ file.schema_id = 1
+ splits.extend(splits2)
+
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(splits)
+ expected = pa.Table.from_pydict({
+ 'user_id': [1, 2, 4, 3, 5, 7],
+ 'item_id': [1001, 1002, 1004, 1003, 1005, 1007],
+ 'dt': ["p1", "p1", "p1", "p2", "p2", "p2"],
+ 'behavior': [None, None, None, None, "e", "g"],
+ }, schema=pa_schema)
+ self.assertEqual(expected, actual)
+
+ # behavior and user_id filter
+ splits = self._scan_table(table1.new_read_builder())
+
+ read_builder = table2.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ ne_predicate = predicate_builder.equal('behavior', "g")
+ lt_predicate = predicate_builder.less_than('user_id', 8)
+ and_predicate = predicate_builder.and_predicates([ne_predicate,
lt_predicate])
+ splits2 = self._scan_table(read_builder.with_filter(and_predicate))
+ for split in splits2:
+ for file in split.files:
+ file.schema_id = 1
+ splits.extend(splits2)
+
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(splits)
+ expected = pa.Table.from_pydict({
+ 'user_id': [1, 2, 4, 3, 7],
+ 'item_id': [1001, 1002, 1004, 1003, 1007],
+ 'dt': ["p1", "p1", "p1", "p2", "p2"],
+ 'behavior': [None, None, None, None, "g"],
+ }, schema=pa_schema)
+ self.assertEqual(expected, actual)
+
+ # user_id filter
+ splits = self._scan_table(table1.new_read_builder())
+
+ read_builder = table2.new_read_builder()
+ predicate_builder = read_builder.new_predicate_builder()
+ predicate = predicate_builder.less_than('user_id', 6)
+ splits2 = self._scan_table(read_builder.with_filter(predicate))
+ self.assertEqual(1, len(splits2))
+ for split in splits2:
+ for file in split.files:
+ file.schema_id = 1
+ splits.extend(splits2)
+
+ table_read = read_builder.new_read()
+ actual = table_read.to_arrow(splits)
+ expected = pa.Table.from_pydict({
+ 'user_id': [1, 2, 4, 3, 5],
+ 'item_id': [1001, 1002, 1004, 1003, 1005],
+ 'dt': ["p1", "p1", "p1", "p2", "p2"],
+ 'behavior': [None, None, None, None, "e"],
+ }, schema=pa_schema)
+ self.assertEqual(expected, actual)
+
def _write_test_table(self, table):
write_builder = table.new_batch_write_builder()