This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ed39c17de2 [python] Fix DLF partition statistical error (#6237)
ed39c17de2 is described below
commit ed39c17de25d655586a02b5598262ce7d04ef052
Author: umi <[email protected]>
AuthorDate: Thu Sep 11 13:20:23 2025 +0800
[python] Fix DLF partition statistical error (#6237)
---
.../pypaimon/tests/py36/ao_read_write_test.py | 122 +++++++++++++++++++++
paimon-python/pypaimon/tests/reader_basic_test.py | 122 +++++++++++++++++++++
paimon-python/pypaimon/write/file_store_commit.py | 6 +-
3 files changed, 247 insertions(+), 3 deletions(-)
diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
index 8f3201d056..9873d6bf3f 100644
--- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
@@ -16,6 +16,8 @@ See the License for the specific language governing
permissions and
limitations under the License.
"""
import logging
+from datetime import datetime
+from unittest.mock import Mock
import pandas as pd
import pyarrow as pa
@@ -34,6 +36,7 @@ from pypaimon.table.row.generic_row import GenericRow,
GenericRowSerializer, Gen
from pypaimon.table.row.row_kind import RowKind
from pypaimon.tests.py36.pyarrow_compat import table_sort_by
from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest
+from pypaimon.write.file_store_commit import FileStoreCommit
class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
@@ -119,6 +122,125 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
+ def test_mixed_add_and_delete_entries_same_partition(self):
+ """Test record_count calculation with mixed ADD/DELETE entries in same
partition."""
+ pa_schema = pa.schema([
+ ('region', pa.string()),
+ ('city', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.rest_catalog.create_table('default.tb', schema, False)
+ table = self.rest_catalog.get_table('default.tb')
+ partition_fields = [
+ DataField(0, "region", AtomicType("STRING")),
+ DataField(1, "city", AtomicType("STRING"))
+ ]
+ partition = GenericRow(['East', 'Boston'], partition_fields)
+
+ # Create ADD entry
+ add_file_meta = Mock(spec=DataFileMeta)
+ add_file_meta.row_count = 200
+ add_file_meta.file_size = 2048
+ add_file_meta.creation_time = datetime.now()
+
+ add_entry = ManifestEntry(
+ kind=0, # ADD
+ partition=partition,
+ bucket=0,
+ total_buckets=1,
+ file=add_file_meta
+ )
+
+ # Create DELETE entry
+ delete_file_meta = Mock(spec=DataFileMeta)
+ delete_file_meta.row_count = 80
+ delete_file_meta.file_size = 800
+ delete_file_meta.creation_time = datetime.now()
+
+ delete_entry = ManifestEntry(
+ kind=1, # DELETE
+ partition=partition,
+ bucket=0,
+ total_buckets=1,
+ file=delete_file_meta
+ )
+ file_store_commit = FileStoreCommit(None, table, "")
+ # Test the method with both entries
+ statistics =
file_store_commit._generate_partition_statistics([add_entry, delete_entry])
+
+ # Verify results - should be merged into single partition statistics
+ self.assertEqual(len(statistics), 1)
+ stat = statistics[0]
+
+ # Net record count: +200 + (-80) = 120
+ self.assertEqual(stat.record_count, 120)
+ self.assertEqual(stat.file_count, 0)
+ self.assertEqual(stat.file_size_in_bytes, 1248)
+
+ def test_multiple_partitions_with_different_operations(self):
+ """Test record_count calculation across multiple partitions."""
+ pa_schema = pa.schema([
+ ('region', pa.string()),
+ ('city', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.rest_catalog.create_table('default.tb1', schema, False)
+ table = self.rest_catalog.get_table('default.tb1')
+ partition_fields = [
+ DataField(0, "region", AtomicType("STRING")),
+ DataField(1, "city", AtomicType("STRING"))
+ ]
+ partition1 = GenericRow(['East', 'Boston'], partition_fields)
+ file_meta1 = Mock(spec=DataFileMeta)
+ file_meta1.row_count = 150
+ file_meta1.file_size = 1500
+ file_meta1.creation_time = datetime.now()
+
+ entry1 = ManifestEntry(
+ kind=0, # ADD
+ partition=partition1,
+ bucket=0,
+ total_buckets=1,
+ file=file_meta1
+ )
+
+ # Partition 2: South/LA - DELETE operation
+ partition2 = GenericRow(['South', 'LA'], partition_fields)
+ file_meta2 = Mock(spec=DataFileMeta)
+ file_meta2.row_count = 75
+ file_meta2.file_size = 750
+ file_meta2.creation_time = datetime.now()
+
+ entry2 = ManifestEntry(
+ kind=1, # DELETE
+ partition=partition2,
+ bucket=0,
+ total_buckets=1,
+ file=file_meta2
+ )
+
+ file_store_commit = FileStoreCommit(None, table, "")
+ # Test the method with both entries
+ statistics = file_store_commit._generate_partition_statistics([entry1,
entry2])
+
+ # Verify results - should have 2 separate partition statistics
+ self.assertEqual(len(statistics), 2)
+
+ # Sort by partition spec for consistent testing
+ statistics.sort(key=lambda s: (s.spec.get('region', ''),
s.spec.get('city', '')))
+
+ # Check North/NY partition (ADD)
+ north_stat = statistics[0]
+ self.assertEqual(north_stat.record_count, 150) # Positive for ADD
+ self.assertEqual(north_stat.file_count, 1)
+ self.assertEqual(north_stat.file_size_in_bytes, 1500)
+
+ # Check South/LA partition (DELETE)
+ south_stat = statistics[1]
+ self.assertEqual(south_stat.record_count, -75) # Negative for DELETE
+ self.assertEqual(south_stat.file_count, -1)
+ self.assertEqual(south_stat.file_size_in_bytes, -750)
+
def testParquetAppendOnlyReader(self):
schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
self.rest_catalog.create_table('default.test_append_only_parquet',
schema, False)
diff --git a/paimon-python/pypaimon/tests/reader_basic_test.py
b/paimon-python/pypaimon/tests/reader_basic_test.py
index 8354ed206a..62255c8a79 100644
--- a/paimon-python/pypaimon/tests/reader_basic_test.py
+++ b/paimon-python/pypaimon/tests/reader_basic_test.py
@@ -20,6 +20,8 @@ import os
import shutil
import tempfile
import unittest
+from datetime import datetime
+from unittest.mock import Mock
import pandas as pd
import pyarrow as pa
@@ -34,6 +36,7 @@ from pypaimon.manifest.manifest_file_manager import
ManifestFileManager
from pypaimon.manifest.schema.simple_stats import SimpleStats
from pypaimon.manifest.schema.data_file_meta import DataFileMeta
from pypaimon.manifest.schema.manifest_entry import ManifestEntry
+from pypaimon.write.file_store_commit import FileStoreCommit
class ReaderBasicTest(unittest.TestCase):
@@ -156,6 +159,125 @@ class ReaderBasicTest(unittest.TestCase):
pd.testing.assert_frame_equal(
actual_df2.reset_index(drop=True), df2.reset_index(drop=True))
+ def test_mixed_add_and_delete_entries_same_partition(self):
+ """Test record_count calculation with mixed ADD/DELETE entries in same
partition."""
+ pa_schema = pa.schema([
+ ('region', pa.string()),
+ ('city', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.catalog.create_table('default.tb', schema, False)
+ table = self.catalog.get_table('default.tb')
+ partition_fields = [
+ DataField(0, "region", AtomicType("STRING")),
+ DataField(1, "city", AtomicType("STRING"))
+ ]
+ partition = GenericRow(['East', 'Boston'], partition_fields)
+
+ # Create ADD entry
+ add_file_meta = Mock(spec=DataFileMeta)
+ add_file_meta.row_count = 200
+ add_file_meta.file_size = 2048
+ add_file_meta.creation_time = datetime.now()
+
+ add_entry = ManifestEntry(
+ kind=0, # ADD
+ partition=partition,
+ bucket=0,
+ total_buckets=1,
+ file=add_file_meta
+ )
+
+ # Create DELETE entry
+ delete_file_meta = Mock(spec=DataFileMeta)
+ delete_file_meta.row_count = 80
+ delete_file_meta.file_size = 800
+ delete_file_meta.creation_time = datetime.now()
+
+ delete_entry = ManifestEntry(
+ kind=1, # DELETE
+ partition=partition,
+ bucket=0,
+ total_buckets=1,
+ file=delete_file_meta
+ )
+ file_store_commit = FileStoreCommit(None, table, "")
+ # Test the method with both entries
+ statistics =
file_store_commit._generate_partition_statistics([add_entry, delete_entry])
+
+ # Verify results - should be merged into single partition statistics
+ self.assertEqual(len(statistics), 1)
+ stat = statistics[0]
+
+ # Net record count: +200 + (-80) = 120
+ self.assertEqual(stat.record_count, 120)
+ self.assertEqual(stat.file_count, 0)
+ self.assertEqual(stat.file_size_in_bytes, 1248)
+
+ def test_multiple_partitions_with_different_operations(self):
+ """Test record_count calculation across multiple partitions."""
+ pa_schema = pa.schema([
+ ('region', pa.string()),
+ ('city', pa.string())
+ ])
+ schema = Schema.from_pyarrow_schema(pa_schema)
+ self.catalog.create_table('default.tb1', schema, False)
+ table = self.catalog.get_table('default.tb1')
+ partition_fields = [
+ DataField(0, "region", AtomicType("STRING")),
+ DataField(1, "city", AtomicType("STRING"))
+ ]
+ partition1 = GenericRow(['East', 'Boston'], partition_fields)
+ file_meta1 = Mock(spec=DataFileMeta)
+ file_meta1.row_count = 150
+ file_meta1.file_size = 1500
+ file_meta1.creation_time = datetime.now()
+
+ entry1 = ManifestEntry(
+ kind=0, # ADD
+ partition=partition1,
+ bucket=0,
+ total_buckets=1,
+ file=file_meta1
+ )
+
+ # Partition 2: South/LA - DELETE operation
+ partition2 = GenericRow(['South', 'LA'], partition_fields)
+ file_meta2 = Mock(spec=DataFileMeta)
+ file_meta2.row_count = 75
+ file_meta2.file_size = 750
+ file_meta2.creation_time = datetime.now()
+
+ entry2 = ManifestEntry(
+ kind=1, # DELETE
+ partition=partition2,
+ bucket=0,
+ total_buckets=1,
+ file=file_meta2
+ )
+
+ file_store_commit = FileStoreCommit(None, table, "")
+ # Test the method with both entries
+ statistics = file_store_commit._generate_partition_statistics([entry1,
entry2])
+
+ # Verify results - should have 2 separate partition statistics
+ self.assertEqual(len(statistics), 2)
+
+ # Sort by partition spec for consistent testing
+ statistics.sort(key=lambda s: (s.spec.get('region', ''),
s.spec.get('city', '')))
+
+ # Check North/NY partition (ADD)
+ north_stat = statistics[0]
+ self.assertEqual(north_stat.record_count, 150) # Positive for ADD
+ self.assertEqual(north_stat.file_count, 1)
+ self.assertEqual(north_stat.file_size_in_bytes, 1500)
+
+ # Check South/LA partition (DELETE)
+ south_stat = statistics[1]
+ self.assertEqual(south_stat.record_count, -75) # Negative for DELETE
+ self.assertEqual(south_stat.file_count, -1)
+ self.assertEqual(south_stat.file_size_in_bytes, -750)
+
def testWriteWrongSchema(self):
self.catalog.create_table('default.test_wrong_schema',
Schema.from_pyarrow_schema(self.pa_schema),
diff --git a/paimon-python/pypaimon/write/file_store_commit.py
b/paimon-python/pypaimon/write/file_store_commit.py
index 8136117092..03c4d034d0 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -281,9 +281,9 @@ class FileStoreCommit:
# Following Java implementation: PartitionEntry.fromDataFile()
file_meta = entry.file
# Extract actual file metadata (following Java DataFileMeta
pattern)
- record_count = file_meta.row_count
- file_size_in_bytes = file_meta.file_size
- file_count = 1
+ record_count = file_meta.row_count if entry.kind == 0 else
file_meta.row_count * -1
+ file_size_in_bytes = file_meta.file_size if entry.kind == 0 else
file_meta.file_size * -1
+ file_count = 1 if entry.kind == 0 else -1
# Convert creation_time to milliseconds (Java uses epoch millis)
if file_meta.creation_time: