This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 24e08a4a78 [python] Let Python write file without value stats by
default (#6940)
24e08a4a78 is described below
commit 24e08a4a78593622c8ad78e2f919580edddb394e
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Dec 31 22:00:38 2025 +0800
[python] Let Python write file without value stats by default (#6940)
---
.../pypaimon/common/options/core_options.py | 10 +++++++
.../pypaimon/manifest/schema/simple_stats.py | 2 +-
paimon-python/pypaimon/tests/predicates_test.py | 2 +-
.../pypaimon/tests/py36/ao_predicate_test.py | 2 +-
.../pypaimon/tests/py36/rest_ao_read_write_test.py | 35 +++++++++++++++-------
paimon-python/pypaimon/tests/reader_base_test.py | 33 ++++++++++++++------
.../pypaimon/tests/schema_evolution_read_test.py | 8 +++--
.../pypaimon/write/writer/data_blob_writer.py | 30 +++++++++----------
paimon-python/pypaimon/write/writer/data_writer.py | 17 ++++++-----
9 files changed, 91 insertions(+), 48 deletions(-)
diff --git a/paimon-python/pypaimon/common/options/core_options.py
b/paimon-python/pypaimon/common/options/core_options.py
index 10ff120d75..4ab5a253d7 100644
--- a/paimon-python/pypaimon/common/options/core_options.py
+++ b/paimon-python/pypaimon/common/options/core_options.py
@@ -153,6 +153,13 @@ class CoreOptions:
.with_description("Define the data block size.")
)
+ METADATA_STATS_MODE: ConfigOption[str] = (
+ ConfigOptions.key("metadata.stats-mode")
+ .string_type()
+ .default_value("none")
+ .with_description("Stats Mode, Python by default is none. Java is
truncate(16).")
+ )
+
BLOB_AS_DESCRIPTOR: ConfigOption[bool] = (
ConfigOptions.key("blob-as-descriptor")
.boolean_type()
@@ -317,6 +324,9 @@ class CoreOptions:
def file_block_size(self, default=None):
return self.options.get(CoreOptions.FILE_BLOCK_SIZE, default)
+ def metadata_stats_enabled(self, default=None):
+ return self.options.get(CoreOptions.METADATA_STATS_MODE, default) ==
"full"
+
def blob_as_descriptor(self, default=None):
return self.options.get(CoreOptions.BLOB_AS_DESCRIPTOR, default)
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py
b/paimon-python/pypaimon/manifest/schema/simple_stats.py
index 10d9e62420..fc16d351b7 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py
@@ -37,7 +37,7 @@ class SimpleStats:
if cls._empty_stats is None:
min_values = GenericRow([], [])
max_values = GenericRow([], [])
- cls._empty_stats = cls(min_values, max_values, None)
+ cls._empty_stats = cls(min_values, max_values, [])
return cls._empty_stats
diff --git a/paimon-python/pypaimon/tests/predicates_test.py
b/paimon-python/pypaimon/tests/predicates_test.py
index 6e6de2fcae..f54a18dd93 100644
--- a/paimon-python/pypaimon/tests/predicates_test.py
+++ b/paimon-python/pypaimon/tests/predicates_test.py
@@ -52,7 +52,7 @@ class PredicateTest(unittest.TestCase):
('f1', pa.string()),
])
cls.catalog.create_table('default.test_append',
Schema.from_pyarrow_schema(
- pa_schema, options={'file.format': _random_format()}), False)
+ pa_schema, options={'file.format': _random_format(),
'metadata.stats-mode': 'full'}), False)
cls.catalog.create_table('default.test_pk', Schema.from_pyarrow_schema(
pa_schema, primary_keys=['f0'], options={'bucket': '1',
'file.format': _random_format()}), False)
diff --git a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
index b06a69baa8..a3b3e0ec9f 100644
--- a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
@@ -40,7 +40,7 @@ class AOPredicatePy36Test(unittest.TestCase):
('f1', pa.string()),
])
cls.catalog.create_table('default.test_append',
Schema.from_pyarrow_schema(
- pa_schema, options={'file.format': _random_format()}), False)
+ pa_schema, options={'file.format': _random_format(),
'metadata.stats-mode': 'full'}), False)
df = pd.DataFrame({
'f0': [1, 2, 3, 4, 5],
diff --git a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
index cb85de28b5..7f27b15b31 100644
--- a/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/rest_ao_read_write_test.py
@@ -17,6 +17,7 @@ limitations under the License.
"""
import logging
import time
+import random
from datetime import date
from decimal import Decimal
from unittest.mock import Mock
@@ -143,7 +144,9 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
('f10', pa.decimal128(10, 2)),
('f11', pa.date32()),
])
- schema = Schema.from_pyarrow_schema(simple_pa_schema)
+ stats_enabled = random.random() < 0.5
+ options = {'metadata.stats-mode': 'full'} if stats_enabled else {}
+ schema = Schema.from_pyarrow_schema(simple_pa_schema, options=options)
self.rest_catalog.create_table('default.test_full_data_types', schema,
False)
table = self.rest_catalog.get_table('default.test_full_data_types')
@@ -183,14 +186,25 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
manifest_files[0].file_name,
lambda row:
table_scan.starting_scanner._filter_manifest_entry(row),
drop_stats=False)
- min_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
-
table.fields).values
- max_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
-
table.fields).values
- expected_min_values = [col[0].as_py() for col in expect_data]
- expected_max_values = [col[1].as_py() for col in expect_data]
- self.assertEqual(min_value_stats, expected_min_values)
- self.assertEqual(max_value_stats, expected_max_values)
+ # Python write does not produce value stats
+ if stats_enabled:
+ self.assertEqual(manifest_entries[0].file.value_stats_cols, None)
+ min_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
+
table.fields).values
+ max_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
+
table.fields).values
+ expected_min_values = [col[0].as_py() for col in expect_data]
+ expected_max_values = [col[1].as_py() for col in expect_data]
+ self.assertEqual(min_value_stats, expected_min_values)
+ self.assertEqual(max_value_stats, expected_max_values)
+ else:
+ self.assertEqual(manifest_entries[0].file.value_stats_cols, [])
+ min_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
+ []).values
+ max_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
+ []).values
+ self.assertEqual(min_value_stats, [])
+ self.assertEqual(max_value_stats, [])
def test_mixed_add_and_delete_entries_same_partition(self):
"""Test record_count calculation with mixed ADD/DELETE entries in same
partition."""
@@ -458,7 +472,8 @@ class RESTAOReadWritePy36Test(RESTBaseTest):
self.assertEqual(result.to_dict(), test_df.to_dict())
def test_append_only_reader_with_filter(self):
- schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+ options = {'metadata.stats-mode': 'full'}
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options=options)
self.rest_catalog.create_table('default.test_append_only_filter',
schema, False)
table = self.rest_catalog.get_table('default.test_append_only_filter')
self._write_test_table(table)
diff --git a/paimon-python/pypaimon/tests/reader_base_test.py
b/paimon-python/pypaimon/tests/reader_base_test.py
index 1455d9ad31..92a275585c 100644
--- a/paimon-python/pypaimon/tests/reader_base_test.py
+++ b/paimon-python/pypaimon/tests/reader_base_test.py
@@ -21,6 +21,7 @@ import os
import shutil
import tempfile
import unittest
+import random
from datetime import date, datetime, time
from decimal import Decimal
from unittest.mock import Mock
@@ -178,7 +179,9 @@ class ReaderBasicTest(unittest.TestCase):
('f12', pa.date32()),
('f13', pa.time64('us')),
])
- schema = Schema.from_pyarrow_schema(simple_pa_schema)
+ stats_enabled = random.random() < 0.5
+ options = {'metadata.stats-mode': 'full'} if stats_enabled else {}
+ schema = Schema.from_pyarrow_schema(simple_pa_schema, options=options)
self.catalog.create_table('default.test_full_data_types', schema,
False)
table = self.catalog.get_table('default.test_full_data_types')
@@ -226,14 +229,26 @@ class ReaderBasicTest(unittest.TestCase):
manifest_files =
table_scan.starting_scanner.manifest_list_manager.read_all(latest_snapshot)
manifest_entries =
table_scan.starting_scanner.manifest_file_manager.read(
manifest_files[0].file_name, lambda row:
table_scan.starting_scanner._filter_manifest_entry(row), False)
- min_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
-
table.fields).values
- max_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
-
table.fields).values
- expected_min_values = [col[0].as_py() for col in expect_data]
- expected_max_values = [col[1].as_py() for col in expect_data]
- self.assertEqual(min_value_stats, expected_min_values)
- self.assertEqual(max_value_stats, expected_max_values)
+
+ # Python write does not produce value stats
+ if stats_enabled:
+ self.assertEqual(manifest_entries[0].file.value_stats_cols, None)
+ min_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
+
table.fields).values
+ max_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
+
table.fields).values
+ expected_min_values = [col[0].as_py() for col in expect_data]
+ expected_max_values = [col[1].as_py() for col in expect_data]
+ self.assertEqual(min_value_stats, expected_min_values)
+ self.assertEqual(max_value_stats, expected_max_values)
+ else:
+ self.assertEqual(manifest_entries[0].file.value_stats_cols, [])
+ min_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.min_values.data,
+ []).values
+ max_value_stats =
GenericRowDeserializer.from_bytes(manifest_entries[0].file.value_stats.max_values.data,
+ []).values
+ self.assertEqual(min_value_stats, [])
+ self.assertEqual(max_value_stats, [])
def test_write_wrong_schema(self):
self.catalog.create_table('default.test_wrong_schema',
diff --git a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
index 2ff4b09e53..f5dafaae35 100644
--- a/paimon-python/pypaimon/tests/schema_evolution_read_test.py
+++ b/paimon-python/pypaimon/tests/schema_evolution_read_test.py
@@ -210,7 +210,8 @@ class SchemaEvolutionReadTest(unittest.TestCase):
('item_id', pa.int64()),
('dt', pa.string())
])
- schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+ options = {'metadata.stats-mode': 'full'}
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'],
options=options)
self.catalog.create_table('default.test_schema_evolution1', schema,
False)
table1 = self.catalog.get_table('default.test_schema_evolution1')
write_builder = table1.new_batch_write_builder()
@@ -275,7 +276,8 @@ class SchemaEvolutionReadTest(unittest.TestCase):
('item_id', pa.int64()),
('dt', pa.string())
])
- schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+ options = {'metadata.stats-mode': 'full'}
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'],
options=options)
self.catalog.create_table('default.test_schema_evolution_with_filter',
schema, False)
table1 =
self.catalog.get_table('default.test_schema_evolution_with_filter')
write_builder = table1.new_batch_write_builder()
@@ -299,7 +301,7 @@ class SchemaEvolutionReadTest(unittest.TestCase):
('dt', pa.string()),
('behavior', pa.string())
])
- schema2 = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'])
+ schema2 = Schema.from_pyarrow_schema(pa_schema, partition_keys=['dt'],
options=options)
self.catalog.create_table('default.test_schema_evolution_with_filter2',
schema2, False)
table2 =
self.catalog.get_table('default.test_schema_evolution_with_filter2')
table2.table_schema.id = 1
diff --git a/paimon-python/pypaimon/write/writer/data_blob_writer.py
b/paimon-python/pypaimon/write/writer/data_blob_writer.py
index c7dd1e890c..eaf2b9483c 100644
--- a/paimon-python/pypaimon/write/writer/data_blob_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_blob_writer.py
@@ -85,6 +85,7 @@ class DataBlobWriter(DataWriter):
# Split schema into normal and blob columns
all_column_names = self.table.field_names
self.normal_column_names = [col for col in all_column_names if col !=
self.blob_column_name]
+ self.normal_columns = [field for field in
self.table.table_schema.fields if field.name != self.blob_column_name]
self.write_cols = self.normal_column_names
# State management for blob writer
@@ -196,13 +197,15 @@ class DataBlobWriter(DataWriter):
return normal_data, blob_data
- def _process_normal_data(self, data: pa.RecordBatch) -> pa.Table:
+ @staticmethod
+ def _process_normal_data(data: pa.RecordBatch) -> pa.Table:
"""Process normal data (similar to base DataWriter)."""
if data is None or data.num_rows == 0:
return pa.Table.from_batches([])
return pa.Table.from_batches([data])
- def _merge_normal_data(self, existing_data: pa.Table, new_data: pa.Table)
-> pa.Table:
+ @staticmethod
+ def _merge_normal_data(existing_data: pa.Table, new_data: pa.Table) ->
pa.Table:
return pa.concat_tables([existing_data, new_data])
def _should_roll_normal(self) -> bool:
@@ -243,7 +246,7 @@ class DataBlobWriter(DataWriter):
logger.info(f"Closed both writers - normal: {normal_meta.file_name}, "
f"added {len(blob_metas)} blob file metadata after normal
metadata")
- def _write_normal_data_to_file(self, data: pa.Table) -> DataFileMeta:
+ def _write_normal_data_to_file(self, data: pa.Table) ->
Optional[DataFileMeta]:
if data.num_rows == 0:
return None
@@ -271,19 +274,16 @@ class DataBlobWriter(DataWriter):
def _create_data_file_meta(self, file_name: str, file_path: str, data:
pa.Table,
external_path: Optional[str] = None) ->
DataFileMeta:
# Column stats (only for normal columns)
+ metadata_stats_enabled = self.options.metadata_stats_enabled()
+ stats_columns = self.normal_columns if metadata_stats_enabled else []
column_stats = {
field.name: self._get_column_stats(data, field.name)
- for field in self.table.table_schema.fields
- if field.name != self.blob_column_name
+ for field in stats_columns
}
- # Get normal fields only
- normal_fields = [field for field in self.table.table_schema.fields
- if field.name != self.blob_column_name]
-
- min_value_stats = [column_stats[field.name]['min_values'] for field in
normal_fields]
- max_value_stats = [column_stats[field.name]['max_values'] for field in
normal_fields]
- value_null_counts = [column_stats[field.name]['null_counts'] for field
in normal_fields]
+ min_value_stats = [column_stats[field.name]['min_values'] for field in
stats_columns]
+ max_value_stats = [column_stats[field.name]['max_values'] for field in
stats_columns]
+ value_null_counts = [column_stats[field.name]['null_counts'] for field
in stats_columns]
self.sequence_generator.start = self.sequence_generator.current
@@ -298,8 +298,8 @@ class DataBlobWriter(DataWriter):
GenericRow([], []),
[]),
value_stats=SimpleStats(
- GenericRow(min_value_stats, normal_fields),
- GenericRow(max_value_stats, normal_fields),
+ GenericRow(min_value_stats, stats_columns),
+ GenericRow(max_value_stats, stats_columns),
value_null_counts),
min_sequence_number=-1,
max_sequence_number=-1,
@@ -309,7 +309,7 @@ class DataBlobWriter(DataWriter):
creation_time=Timestamp.now(),
delete_row_count=0,
file_source=0,
- value_stats_cols=self.normal_column_names,
+ value_stats_cols=[column.name for column in stats_columns],
external_path=external_path,
file_path=file_path,
write_cols=self.write_cols)
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index d1559bc051..73609ed912 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -191,16 +191,17 @@ class DataWriter(ABC):
max_key = [col.to_pylist()[0] for col in max_key_row_batch.columns]
# key stats & value stats
- data_fields = self.table.fields if self.table.is_primary_key_table \
- else PyarrowFieldParser.to_paimon_schema(data.schema)
+ value_stats_enabled = self.options.metadata_stats_enabled()
+ stats_fields = PyarrowFieldParser.to_paimon_schema(data.schema) if
value_stats_enabled\
+ else self.table.trimmed_primary_keys_fields
column_stats = {
field.name: self._get_column_stats(data, field.name)
- for field in data_fields
+ for field in stats_fields
}
- all_fields = data_fields
- min_value_stats = [column_stats[field.name]['min_values'] for field in
all_fields]
- max_value_stats = [column_stats[field.name]['max_values'] for field in
all_fields]
- value_null_counts = [column_stats[field.name]['null_counts'] for field
in all_fields]
+ data_fields = stats_fields if value_stats_enabled else []
+ min_value_stats = [column_stats[field.name]['min_values'] for field in
data_fields]
+ max_value_stats = [column_stats[field.name]['max_values'] for field in
data_fields]
+ value_null_counts = [column_stats[field.name]['null_counts'] for field
in data_fields]
key_fields = self.trimmed_primary_keys_fields
min_key_stats = [column_stats[field.name]['min_values'] for field in
key_fields]
max_key_stats = [column_stats[field.name]['max_values'] for field in
key_fields]
@@ -235,7 +236,7 @@ class DataWriter(ABC):
creation_time=Timestamp.now(),
delete_row_count=0,
file_source=0,
- value_stats_cols=None, # None means all columns in the data have
statistics
+ value_stats_cols=None if value_stats_enabled else [],
external_path=external_path_str, # Set external path if using
external paths
first_row_id=None,
write_cols=self.write_cols,