This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 662a56662a [python] fix value_stats containing system fields for 
primary key tables (#6945)
662a56662a is described below

commit 662a56662aab574b6a10ac696a8a6c4d3c7a90c1
Author: XiaoHongbo <[email protected]>
AuthorDate: Sat Jan 3 20:53:01 2026 +0800

    [python] fix value_stats containing system fields for primary key tables 
(#6945)
---
 .../paimon/table/PrimaryKeySimpleTableTest.java    |  29 +++++
 paimon-python/pypaimon/tests/reader_base_test.py   | 132 +++++++++++++++++++++
 paimon-python/pypaimon/write/writer/data_writer.py |   7 +-
 3 files changed, 166 insertions(+), 2 deletions(-)

diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 3b9d7d2831..ef0ab9de24 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -101,6 +101,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -142,6 +143,7 @@ import static 
org.apache.paimon.predicate.PredicateBuilder.and;
 import static org.apache.paimon.predicate.SortValue.NullOrdering.NULLS_LAST;
 import static org.apache.paimon.predicate.SortValue.SortDirection.ASCENDING;
 import static org.apache.paimon.predicate.SortValue.SortDirection.DESCENDING;
+import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -248,6 +250,33 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
         file = ((DataSplit) 
table.newScan().plan().splits().get(0)).dataFiles().get(0);
         assertThat(file.level()).isEqualTo(5);
         
assertThat(file.valueStats().maxValues().getFieldCount()).isGreaterThan(4);
+
+        if (file.valueStatsCols() == null) {
+            int expectedFieldCount = table.schema().fields().size();
+            int actualFieldCount = 
file.valueStats().minValues().getFieldCount();
+            assertThat(actualFieldCount)
+                    .as(
+                            "When value_stats_cols is null, value_stats field 
count should match table.fields count. "
+                                    + "This ensures value_stats does NOT 
contain system fields.")
+                    .isEqualTo(expectedFieldCount);
+        } else {
+            for (String fieldName : 
Objects.requireNonNull(file.valueStatsCols())) {
+                boolean isSystemField =
+                        fieldName.startsWith(KEY_FIELD_PREFIX)
+                                || SpecialFields.isSystemField(fieldName);
+                assertThat(isSystemField)
+                        .as("value_stats_cols should NOT contain system field: 
" + fieldName)
+                        .isFalse();
+            }
+            assertThat(file.valueStats().minValues().getFieldCount())
+                    .as("value_stats field count should match value_stats_cols 
size")
+                    
.isEqualTo(Objects.requireNonNull(file.valueStatsCols()).size());
+        }
+
+        assertThat(file.valueStats().minValues().getFieldCount())
+                .isEqualTo(file.valueStats().maxValues().getFieldCount());
+        assertThat(file.valueStats().nullCounts().size())
+                .isEqualTo(file.valueStats().minValues().getFieldCount());
     }
 
     @Test
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py 
b/paimon-python/pypaimon/tests/reader_base_test.py
index 92a275585c..81a7a5baf9 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -471,6 +471,81 @@ class ReaderBasicTest(unittest.TestCase):
             test_name="specific_case"
         )
 
+        schema_with_stats = Schema.from_pyarrow_schema(pa_schema, 
options={'metadata.stats-mode': 'full'})
+        catalog.create_table("test_db.test_value_stats_cols_schema_match", 
schema_with_stats, False)
+        table_with_stats = 
catalog.get_table("test_db.test_value_stats_cols_schema_match")
+        self._test_append_only_schema_match_case(table_with_stats, pa_schema)
+
+    def test_primary_key_value_stats_excludes_system_fields(self):
+        catalog = CatalogFactory.create({
+            "warehouse": self.warehouse
+        })
+        catalog.create_database("test_db_system_fields", True)
+
+        pk_pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('name', pa.string()),
+            ('price', pa.float64()),
+        ])
+        pk_schema = Schema.from_pyarrow_schema(
+            pk_pa_schema,
+            primary_keys=['id'],
+            options={'metadata.stats-mode': 'full', 'bucket': '2'}
+        )
+        
catalog.create_table("test_db_system_fields.test_pk_value_stats_system_fields", 
pk_schema, False)
+        pk_table = 
catalog.get_table("test_db_system_fields.test_pk_value_stats_system_fields")
+
+        pk_test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['Alice', 'Bob', 'Charlie'],
+            'price': [10.5, 20.3, 30.7],
+        }, schema=pk_pa_schema)
+
+        pk_write_builder = pk_table.new_batch_write_builder()
+        pk_writer = pk_write_builder.new_write()
+        pk_writer.write_arrow(pk_test_data)
+        pk_commit_messages = pk_writer.prepare_commit()
+        pk_commit = pk_write_builder.new_commit()
+        pk_commit.commit(pk_commit_messages)
+        pk_writer.close()
+
+        pk_read_builder = pk_table.new_read_builder()
+        pk_table_scan = pk_read_builder.new_scan()
+        latest_snapshot = SnapshotManager(pk_table).get_latest_snapshot()
+        pk_manifest_files = 
pk_table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+        pk_manifest_entries = 
pk_table_scan.starting_scanner.manifest_file_manager.read(
+            pk_manifest_files[0].file_name,
+            lambda row: 
pk_table_scan.starting_scanner._filter_manifest_entry(row),
+            False
+        )
+
+        self.assertGreater(len(pk_manifest_entries), 0, "Should have at least 
one manifest entry")
+        pk_file_meta = pk_manifest_entries[0].file
+
+        pk_table_field_names = {f.name for f in pk_table.fields}
+        system_fields = {'_KEY_id', '_SEQUENCE_NUMBER', '_VALUE_KIND', 
'_ROW_ID'}
+        pk_table_has_system_fields = bool(pk_table_field_names & system_fields)
+        self.assertFalse(pk_table_has_system_fields,
+                         f"table.fields should NOT contain system fields, but 
got: {pk_table_field_names}")
+
+        if pk_file_meta.value_stats_cols is None:
+            pk_value_stats_fields = 
pk_table_scan.starting_scanner.manifest_file_manager._get_value_stats_fields(
+                {'_VALUE_STATS_COLS': None},
+                pk_table.fields
+            )
+            expected_count = len(pk_value_stats_fields)
+            actual_count = pk_file_meta.value_stats.min_values.arity
+            self.assertEqual(actual_count, expected_count,
+                             f"Field count mismatch: value_stats has 
{actual_count} fields, "
+                             f"but table.fields has {expected_count} fields. "
+                             f"This indicates value_stats contains system 
fields that are not in table.fields.")
+        else:
+            for field_name in pk_file_meta.value_stats_cols:
+                is_system_field = (field_name.startswith('_KEY_') or
+                                   field_name in ['_SEQUENCE_NUMBER', 
'_VALUE_KIND', '_ROW_ID'])
+                self.assertFalse(is_system_field,
+                                 f"value_stats_cols should not contain system 
field: {field_name}")
+
     def test_types(self):
         data_fields = [
             DataField(0, "f0", AtomicType('TINYINT'), 'desc'),
@@ -700,6 +775,63 @@ class ReaderBasicTest(unittest.TestCase):
 
         self.assertEqual(read_entry.file.value_stats.null_counts, null_counts)
 
+    def _test_append_only_schema_match_case(self, table, pa_schema):
+        """Test that for append-only tables, data.schema matches table.fields.
+
+        This verifies the assumption in data_writer.py that for append-only 
tables,
+        PyarrowFieldParser.to_paimon_schema(data.schema) should have the same 
fields
+        as self.table.fields (same count and same field names).
+        """
+        self.assertFalse(table.is_primary_key_table,
+                         "Table should be append-only (no primary keys)")
+
+        test_data = pa.Table.from_pydict({
+            'id': [1, 2, 3],
+            'name': ['Alice', 'Bob', 'Charlie'],
+            'price': [10.5, 20.3, 30.7],
+            'category': ['A', 'B', 'C'],
+        }, schema=pa_schema)
+
+        write_builder = table.new_batch_write_builder()
+        writer = write_builder.new_write()
+        writer.write_arrow(test_data)
+        commit_messages = writer.prepare_commit()
+        commit = write_builder.new_commit()
+        commit.commit(commit_messages)
+        writer.close()
+
+        # Verify that data.schema (converted to paimon schema) matches 
table.fields
+        data_fields_from_schema = 
PyarrowFieldParser.to_paimon_schema(test_data.schema)
+        table_fields = table.fields
+
+        # Verify field count matches
+        self.assertEqual(len(data_fields_from_schema), len(table_fields),
+                         f"Field count mismatch: data.schema has 
{len(data_fields_from_schema)} fields, "
+                         f"but table.fields has {len(table_fields)} fields")
+
+        # Verify field names match (order may differ, but names should match)
+        data_field_names = {field.name for field in data_fields_from_schema}
+        table_field_names = {field.name for field in table_fields}
+        self.assertEqual(data_field_names, table_field_names,
+                         f"Field names mismatch: data.schema has 
{data_field_names}, "
+                         f"but table.fields has {table_field_names}")
+
+        # Read manifest to verify value_stats_cols is None (all fields 
included)
+        read_builder = table.new_read_builder()
+        table_scan = read_builder.new_scan()
+        latest_snapshot = SnapshotManager(table).get_latest_snapshot()
+        manifest_files = 
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
+        manifest_entries = 
table_scan.starting_scanner.manifest_file_manager.read(
+            manifest_files[0].file_name,
+            lambda row: 
table_scan.starting_scanner._filter_manifest_entry(row),
+            False
+        )
+
+        if len(manifest_entries) > 0:
+            file_meta = manifest_entries[0].file
+            self.assertIsNone(file_meta.value_stats_cols,
+                              "value_stats_cols should be None when all table 
fields are included")
+
     def test_split_target_size(self):
         """Test source.split.target-size configuration effect on split 
generation."""
         from pypaimon.common.options.core_options import CoreOptions
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index 73609ed912..38dd58c15e 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -192,8 +192,11 @@ class DataWriter(ABC):
 
         # key stats & value stats
         value_stats_enabled = self.options.metadata_stats_enabled()
-        stats_fields = PyarrowFieldParser.to_paimon_schema(data.schema) if 
value_stats_enabled\
-            else self.table.trimmed_primary_keys_fields
+        if value_stats_enabled:
+            stats_fields = self.table.fields if 
self.table.is_primary_key_table \
+                else PyarrowFieldParser.to_paimon_schema(data.schema)
+        else:
+            stats_fields = self.table.trimmed_primary_keys_fields
         column_stats = {
             field.name: self._get_column_stats(data, field.name)
             for field in stats_fields

Reply via email to