This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f0d1eb428b [python] fix column nullability info lost issue during 
schema rebuilding (#7174)
f0d1eb428b is described below

commit f0d1eb428b9afb202a5997329519e533a11b9c4a
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Feb 2 19:16:42 2026 +0800

    [python] fix column nullability info lost issue during schema rebuilding 
(#7174)
---
 .../pypaimon/read/reader/concat_batch_reader.py    | 15 +++++---
 .../pypaimon/read/reader/data_file_batch_reader.py | 13 ++++++-
 .../pypaimon/read/reader/format_pyarrow_reader.py  |  6 ++--
 paimon-python/pypaimon/read/split_read.py          |  5 +--
 .../pypaimon/tests/data_evolution_test.py          | 42 +++++++++++++++++++---
 .../pypaimon/write/writer/key_value_data_writer.py | 25 ++++++-------
 6 files changed, 80 insertions(+), 26 deletions(-)

diff --git a/paimon-python/pypaimon/read/reader/concat_batch_reader.py 
b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
index b15a37f79f..7bf884a88e 100644
--- a/paimon-python/pypaimon/read/reader/concat_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/concat_batch_reader.py
@@ -113,7 +113,7 @@ class MergeAllBatchReader(RecordBatchReader):
                         combined_arrays.append(pa.concat_arrays(column_arrays))
                     self.merged_batch = pa.RecordBatch.from_arrays(
                         combined_arrays,
-                        names=all_concatenated_batches[0].schema.names
+                        schema=all_concatenated_batches[0].schema
                     )
         else:
             self.merged_batch = None
@@ -141,7 +141,13 @@ class DataEvolutionMergeReader(RecordBatchReader):
      - The sixth field comes from batch1, and it is at offset 0 in batch1.
     """
 
-    def __init__(self, row_offsets: List[int], field_offsets: List[int], 
readers: List[Optional[RecordBatchReader]]):
+    def __init__(
+        self,
+        row_offsets: List[int],
+        field_offsets: List[int],
+        readers: List[Optional[RecordBatchReader]],
+        schema: pa.Schema,
+    ):
         if row_offsets is None:
             raise ValueError("Row offsets must not be null")
         if field_offsets is None:
@@ -155,6 +161,7 @@ class DataEvolutionMergeReader(RecordBatchReader):
         self.row_offsets = row_offsets
         self.field_offsets = field_offsets
         self.readers = readers
+        self.schema = schema
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         batches: List[Optional[RecordBatch]] = [None] * len(self.readers)
@@ -167,16 +174,14 @@ class DataEvolutionMergeReader(RecordBatchReader):
                 batches[i] = batch
         # Assemble record batches from batches based on row_offsets and 
field_offsets
         columns = []
-        names = []
         for i in range(len(self.row_offsets)):
             batch_index = self.row_offsets[i]
             field_index = self.field_offsets[i]
             if batches[batch_index] is not None:
                 column = batches[batch_index].column(field_index)
                 columns.append(column)
-                names.append(batches[batch_index].schema.names[field_index])
         if columns:
-            return pa.RecordBatch.from_arrays(columns, names)
+            return pa.RecordBatch.from_arrays(columns, schema=self.schema)
         return None
 
     def close(self) -> None:
diff --git a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py 
b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
index 014d4f9da3..c9e51785a0 100644
--- a/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_batch_reader.py
@@ -135,7 +135,18 @@ class DataFileBatchReader(RecordBatchReader):
             # Create a new array that fills with max_sequence_number
             arrays[idx] = pa.repeat(self.max_sequence_number, 
record_batch.num_rows)
 
-        return pa.RecordBatch.from_arrays(arrays, 
names=record_batch.schema.names)
+        names = record_batch.schema.names
+        table = None
+        for i, name in enumerate(names):
+            field = pa.field(
+                name, arrays[i].type,
+                nullable=record_batch.schema.field(name).nullable
+            )
+            if table is None:
+                table = pa.table({name: arrays[i]}, schema=pa.schema([field]))
+            else:
+                table = table.append_column(field, arrays[i])
+        return table.to_batches()[0]
 
     def close(self) -> None:
         self.format_reader.close()
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py 
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index 699ff48477..dd5330227d 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -62,18 +62,20 @@ class FormatPyArrowReader(RecordBatchReader):
 
             # Reconstruct the batch with all fields in the correct order
             all_columns = []
+            out_fields = []
             for field_name in self.read_fields:
                 if field_name in self.existing_fields:
                     # Get the column from the existing batch
                     column_idx = self.existing_fields.index(field_name)
                     all_columns.append(batch.column(column_idx))
+                    out_fields.append(batch.schema.field(column_idx))
                 else:
                     # Get the column from missing fields
                     column_idx = self.missing_fields.index(field_name)
                     all_columns.append(missing_columns[column_idx])
-
+                    out_fields.append(pa.field(field_name, pa.null(), 
nullable=True))
             # Create a new RecordBatch with all columns
-            return pa.RecordBatch.from_arrays(all_columns, 
names=self.read_fields)
+            return pa.RecordBatch.from_arrays(all_columns, 
schema=pa.schema(out_fields))
 
         except StopIteration:
             return None
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index eab279cf9c..efd43f4e3e 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -52,7 +52,7 @@ from pypaimon.read.reader.shard_batch_reader import 
ShardBatchReader
 from pypaimon.read.reader.sort_merge_reader import SortMergeReaderWithMinHeap
 from pypaimon.read.split import Split
 from pypaimon.read.sliced_split import SlicedSplit
-from pypaimon.schema.data_types import DataField
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser
 from pypaimon.table.special_fields import SpecialFields
 
 KEY_PREFIX = "_KEY_"
@@ -571,7 +571,8 @@ class DataEvolutionSplitRead(SplitRead):
                 if not field.type.nullable:
                     raise ValueError(f"Field {field} is not null but can't 
find any file contains it.")
 
-        return DataEvolutionMergeReader(row_offsets, field_offsets, 
file_record_readers)
+        output_schema = PyarrowFieldParser.from_paimon_schema(all_read_fields)
+        return DataEvolutionMergeReader(row_offsets, field_offsets, 
file_record_readers, schema=output_schema)
 
     def _create_file_reader(self, file: DataFileMeta, read_fields: [str]) -> 
Optional[RecordReader]:
         """Create a file reader for a single file."""
diff --git a/paimon-python/pypaimon/tests/data_evolution_test.py 
b/paimon-python/pypaimon/tests/data_evolution_test.py
index 1ffb7dbcc4..436b99e4df 100644
--- a/paimon-python/pypaimon/tests/data_evolution_test.py
+++ b/paimon-python/pypaimon/tests/data_evolution_test.py
@@ -483,6 +483,20 @@ class DataEvolutionTest(unittest.TestCase):
         }, schema=simple_pa_schema)
         self.assertEqual(actual, expect)
 
+        read_builder = table.new_read_builder()
+        read_builder.with_projection(['f0', 'f1', 'f2', '_ROW_ID', 
'_SEQUENCE_NUMBER'])
+        table_scan = read_builder.new_scan()
+        table_read = read_builder.new_read()
+        actual_with_meta = table_read.to_arrow(table_scan.plan().splits())
+        self.assertFalse(
+            actual_with_meta.schema.field('_ROW_ID').nullable,
+            '_ROW_ID must be non-nullable per SpecialFields',
+        )
+        self.assertFalse(
+            actual_with_meta.schema.field('_SEQUENCE_NUMBER').nullable,
+            '_SEQUENCE_NUMBER must be non-nullable per SpecialFields',
+        )
+
     def test_more_data(self):
         simple_pa_schema = pa.schema([
             ('f0', pa.int32()),
@@ -580,9 +594,9 @@ class DataEvolutionTest(unittest.TestCase):
             '_SEQUENCE_NUMBER': [1, 1],
         }, schema=pa.schema([
             ('f0', pa.int8()),
-            ('_ROW_ID', pa.int64()),
+            pa.field('_ROW_ID', pa.int64(), nullable=False),
             ('f1', pa.int16()),
-            ('_SEQUENCE_NUMBER', pa.int64()),
+            pa.field('_SEQUENCE_NUMBER', pa.int64(), nullable=False),
         ]))
         self.assertEqual(actual_data, expect_data)
 
@@ -606,6 +620,8 @@ class DataEvolutionTest(unittest.TestCase):
         table_scan = read_builder.new_scan()
         table_read = read_builder.new_read()
         actual_data = table_read.to_arrow(table_scan.plan().splits())
+        self.assertFalse(actual_data.schema.field('_ROW_ID').nullable)
+        self.assertFalse(actual_data.schema.field('_SEQUENCE_NUMBER').nullable)
         expect_data = pa.Table.from_pydict({
             'f0': [3, 4],
             'f1': [-1001, 1002],
@@ -614,7 +630,25 @@ class DataEvolutionTest(unittest.TestCase):
         }, schema=pa.schema([
             ('f0', pa.int8()),
             ('f1', pa.int16()),
-            ('_ROW_ID', pa.int64()),
-            ('_SEQUENCE_NUMBER', pa.int64()),
+            pa.field('_ROW_ID', pa.int64(), nullable=False),
+            pa.field('_SEQUENCE_NUMBER', pa.int64(), nullable=False),
         ]))
         self.assertEqual(actual_data, expect_data)
+
+    def test_from_arrays_without_schema(self):
+        schema = pa.schema([
+            ('f0', pa.int8()),
+            pa.field('_ROW_ID', pa.int64(), nullable=False),
+            pa.field('_SEQUENCE_NUMBER', pa.int64(), nullable=False),
+        ])
+        batch = pa.RecordBatch.from_pydict(
+            {'f0': [1], '_ROW_ID': [0], '_SEQUENCE_NUMBER': [1]},
+            schema=schema
+        )
+        self.assertFalse(batch.schema.field('_ROW_ID').nullable)
+        self.assertFalse(batch.schema.field('_SEQUENCE_NUMBER').nullable)
+
+        arrays = list(batch.columns)
+        rebuilt = pa.RecordBatch.from_arrays(arrays, names=batch.schema.names)
+        self.assertTrue(rebuilt.schema.field('_ROW_ID').nullable)
+        self.assertTrue(rebuilt.schema.field('_SEQUENCE_NUMBER').nullable)
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py 
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index 3165a01069..1b2dde7d1d 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -36,30 +36,31 @@ class KeyValueDataWriter(DataWriter):
     def _add_system_fields(self, data: pa.RecordBatch) -> pa.RecordBatch:
         """Add system fields: _KEY_{pk_key}, _SEQUENCE_NUMBER, _VALUE_KIND."""
         num_rows = data.num_rows
-        
+
         new_arrays = []
-        new_names = []
-        
+        new_fields = []
+
         for pk_key in self.trimmed_primary_keys:
             if pk_key in data.schema.names:
                 key_column = data.column(pk_key)
                 new_arrays.append(key_column)
-                new_names.append(f'_KEY_{pk_key}')
-        
+                src_field = data.schema.field(pk_key)
+                new_fields.append(pa.field(f'_KEY_{pk_key}', src_field.type, 
nullable=src_field.nullable))
+
         sequence_column = pa.array([self.sequence_generator.next() for _ in 
range(num_rows)], type=pa.int64())
         new_arrays.append(sequence_column)
-        new_names.append('_SEQUENCE_NUMBER')
-        
+        new_fields.append(pa.field('_SEQUENCE_NUMBER', pa.int64(), 
nullable=False))
+
         # TODO: support real row kind here
         value_kind_column = pa.array([0] * num_rows, type=pa.int8())
         new_arrays.append(value_kind_column)
-        new_names.append('_VALUE_KIND')
-        
+        new_fields.append(pa.field('_VALUE_KIND', pa.int8(), nullable=False))
+
         for i in range(data.num_columns):
             new_arrays.append(data.column(i))
-            new_names.append(data.schema.names[i])
-        
-        return pa.RecordBatch.from_arrays(new_arrays, names=new_names)
+            new_fields.append(data.schema.field(i))
+
+        return pa.RecordBatch.from_arrays(new_arrays, 
schema=pa.schema(new_fields))
 
     def _sort_by_primary_key(self, data: pa.RecordBatch) -> pa.RecordBatch:
         sort_keys = [(key, 'ascending') for key in self.trimmed_primary_keys]

Reply via email to