This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit a66f87d16fcd40fd7f3fda9fda2afeb6d010bd57
Author: ChengHui Chen <[email protected]>
AuthorDate: Wed Aug 27 11:36:21 2025 +0800

    [Python] Enhanced Manifest Statistics & Hybrid Java/Python Writer Tests 
(#6119)
---
 .github/workflows/paimon-python-checks.yml         |   2 +-
 .../pypaimon/catalog/filesystem_catalog.py         |   4 +-
 .../pypaimon/catalog/renaming_snapshot_commit.py   |   2 +-
 .../pypaimon/manifest/manifest_file_manager.py     |  92 ++++++++-----
 .../pypaimon/manifest/manifest_list_manager.py     |  73 +++++++----
 .../pypaimon/manifest/schema/data_file_meta.py     |  36 ++---
 .../pypaimon/manifest/schema/manifest_entry.py     |   1 +
 .../pypaimon/manifest/schema/manifest_file_meta.py |   5 +-
 .../pypaimon/manifest/schema/simple_stats.py       |  17 ++-
 .../read/reader/data_file_record_reader.py         |  16 ++-
 paimon-python/pypaimon/read/split_read.py          |   6 +-
 paimon-python/pypaimon/read/table_scan.py          |   6 +-
 paimon-python/pypaimon/schema/data_types.py        |  13 +-
 paimon-python/pypaimon/schema/schema_manager.py    |  11 +-
 paimon-python/pypaimon/schema/table_schema.py      |  37 +-----
 paimon-python/pypaimon/table/file_store_table.py   |   5 +-
 paimon-python/pypaimon/table/row/binary_row.py     | 145 ++++++---------------
 .../pypaimon/tests/reader_primary_key_test.py      |   4 +-
 .../pypaimon/tests/rest_catalog_base_test.py       |   2 +-
 paimon-python/pypaimon/tests/rest_server.py        |   2 +-
 paimon-python/pypaimon/tests/rest_table_test.py    |   2 +-
 paimon-python/pypaimon/tests/schema_test.py        |   3 +-
 paimon-python/pypaimon/tests/writer_test.py        |   2 +-
 paimon-python/pypaimon/write/commit_message.py     |  26 +---
 paimon-python/pypaimon/write/file_store_commit.py  | 114 ++++++++--------
 paimon-python/pypaimon/write/file_store_write.py   |  34 ++++-
 paimon-python/pypaimon/write/writer/data_writer.py | 132 ++++++++++++++-----
 .../pypaimon/write/writer/key_value_data_writer.py |  16 +--
 paimon-python/setup.py                             |   3 +-
 29 files changed, 421 insertions(+), 390 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 9a38e9b5c2..6f6a8a23a8 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -46,7 +46,7 @@ jobs:
           python-version: ${{ env.PYTHON_VERSION }}
       - name: Install dependencies
         run: |
-          python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 
polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 flake8==4.0.1 
pytest~=7.0 requests 2>&1 >/dev/null
+          python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 
flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests 2>&1 >/dev/null
       - name: Run lint-python.sh
         run: |
           chmod +x paimon-python/dev/lint-python.sh
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 68d1e438c8..1aed6e8137 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -17,7 +17,7 @@
 
#################################################################################
 
 from pathlib import Path
-from typing import Optional, Union, List
+from typing import List, Optional, Union
 from urllib.parse import urlparse
 
 from pypaimon.catalog.catalog import Catalog
@@ -67,7 +67,7 @@ class FileSystemCatalog(Catalog):
         if not isinstance(identifier, Identifier):
             identifier = Identifier.from_string(identifier)
         if CoreOptions.SCAN_FALLBACK_BRANCH in self.catalog_options:
-            raise ValueError(CoreOptions.SCAN_FALLBACK_BRANCH)
+            raise ValueError(f"Unsupported CoreOption 
{CoreOptions.SCAN_FALLBACK_BRANCH}")
         table_path = self.get_table_path(identifier)
         table_schema = self.get_table_schema(identifier)
 
diff --git a/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py 
b/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
index 4a2e70e4e2..b85c4200a1 100644
--- a/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
+++ b/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
@@ -63,7 +63,7 @@ class RenamingSnapshotCommit(SnapshotCommit):
         if not self.file_io.exists(new_snapshot_path):
             """Internal function to perform the actual commit."""
             # Try to write atomically using the file IO
-            committed = self.file_io.try_to_write_atomic(new_snapshot_path, 
JSON.to_json(snapshot))
+            committed = self.file_io.try_to_write_atomic(new_snapshot_path, 
JSON.to_json(snapshot, indent=2))
             if committed:
                 # Update the latest hint
                 self._commit_latest_hint(snapshot.id)
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index ef674e5b88..7c46b368d2 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -15,7 +15,6 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import uuid
 from io import BytesIO
 from typing import List
 
@@ -24,8 +23,10 @@ import fastavro
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
                                                      ManifestEntry)
+from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.table.row.binary_row import (BinaryRow, BinaryRowDeserializer,
                                            BinaryRowSerializer)
+from pypaimon.write.commit_message import CommitMessage
 
 
 class ManifestFileManager:
@@ -51,20 +52,40 @@ class ManifestFileManager:
         reader = fastavro.reader(buffer)
 
         for record in reader:
-            file_info = dict(record['_FILE'])
+            file_dict = dict(record['_FILE'])
+            key_dict = dict(file_dict['_KEY_STATS'])
+            key_stats = SimpleStats(
+                
min_value=BinaryRowDeserializer.from_bytes(key_dict['_MIN_VALUES'],
+                                                           
self.trimmed_primary_key_fields),
+                
max_value=BinaryRowDeserializer.from_bytes(key_dict['_MAX_VALUES'],
+                                                           
self.trimmed_primary_key_fields),
+                null_count=key_dict['_NULL_COUNTS'],
+            )
+            value_dict = dict(file_dict['_VALUE_STATS'])
+            value_stats = SimpleStats(
+                
min_value=BinaryRowDeserializer.from_bytes(value_dict['_MIN_VALUES'],
+                                                           
self.table.table_schema.fields),
+                
max_value=BinaryRowDeserializer.from_bytes(value_dict['_MAX_VALUES'],
+                                                           
self.table.table_schema.fields),
+                null_count=value_dict['_NULL_COUNTS'],
+            )
             file_meta = DataFileMeta(
-                file_name=file_info['_FILE_NAME'],
-                file_size=file_info['_FILE_SIZE'],
-                row_count=file_info['_ROW_COUNT'],
-                
min_key=BinaryRowDeserializer.from_bytes(file_info['_MIN_KEY'], 
self.trimmed_primary_key_fields),
-                
max_key=BinaryRowDeserializer.from_bytes(file_info['_MAX_KEY'], 
self.trimmed_primary_key_fields),
-                key_stats=None,  # TODO
-                value_stats=None,  # TODO
-                min_sequence_number=file_info['_MIN_SEQUENCE_NUMBER'],
-                max_sequence_number=file_info['_MAX_SEQUENCE_NUMBER'],
-                schema_id=file_info['_SCHEMA_ID'],
-                level=file_info['_LEVEL'],
-                extra_files=None,  # TODO
+                file_name=file_dict['_FILE_NAME'],
+                file_size=file_dict['_FILE_SIZE'],
+                row_count=file_dict['_ROW_COUNT'],
+                
min_key=BinaryRowDeserializer.from_bytes(file_dict['_MIN_KEY'], 
self.trimmed_primary_key_fields),
+                
max_key=BinaryRowDeserializer.from_bytes(file_dict['_MAX_KEY'], 
self.trimmed_primary_key_fields),
+                key_stats=key_stats,
+                value_stats=value_stats,
+                min_sequence_number=file_dict['_MIN_SEQUENCE_NUMBER'],
+                max_sequence_number=file_dict['_MAX_SEQUENCE_NUMBER'],
+                schema_id=file_dict['_SCHEMA_ID'],
+                level=file_dict['_LEVEL'],
+                extra_files=file_dict['_EXTRA_FILES'],
+                creation_time=file_dict['_CREATION_TIME'],
+                delete_row_count=file_dict['_DELETE_ROW_COUNT'],
+                embedded_index=file_dict['_EMBEDDED_FILE_INDEX'],
+                file_source=file_dict['_FILE_SOURCE'],
             )
             entry = ManifestEntry(
                 kind=record['_KIND'],
@@ -73,22 +94,23 @@ class ManifestFileManager:
                 total_buckets=record['_TOTAL_BUCKETS'],
                 file=file_meta
             )
-            if not shard_filter(entry):
+            if shard_filter is not None and not shard_filter(entry):
                 continue
             entries.append(entry)
         return entries
 
-    def write(self, commit_messages: List['CommitMessage']) -> List[str]:
+    def write(self, file_name, commit_messages: List[CommitMessage]):
         avro_records = []
         for message in commit_messages:
             partition_bytes = BinaryRowSerializer.to_bytes(
-                BinaryRow(list(message.partition()), 
self.table.table_schema.get_partition_key_fields()))
-            for file in message.new_files():
+                BinaryRow(list(message.partition), 
self.table.table_schema.get_partition_key_fields()))
+            for file in message.new_files:
                 avro_record = {
+                    "_VERSION": 2,
                     "_KIND": 0,
                     "_PARTITION": partition_bytes,
-                    "_BUCKET": message.bucket(),
-                    "_TOTAL_BUCKETS": -1,  # TODO
+                    "_BUCKET": message.bucket,
+                    "_TOTAL_BUCKETS": self.table.total_buckets,
                     "_FILE": {
                         "_FILE_NAME": file.file_name,
                         "_FILE_SIZE": file.file_size,
@@ -96,33 +118,35 @@ class ManifestFileManager:
                         "_MIN_KEY": BinaryRowSerializer.to_bytes(file.min_key),
                         "_MAX_KEY": BinaryRowSerializer.to_bytes(file.max_key),
                         "_KEY_STATS": {
-                            "_MIN_VALUES": None,
-                            "_MAX_VALUES": None,
-                            "_NULL_COUNTS": 0,
+                            "_MIN_VALUES": 
BinaryRowSerializer.to_bytes(file.key_stats.min_value),
+                            "_MAX_VALUES": 
BinaryRowSerializer.to_bytes(file.key_stats.max_value),
+                            "_NULL_COUNTS": file.key_stats.null_count,
                         },
                         "_VALUE_STATS": {
-                            "_MIN_VALUES": None,
-                            "_MAX_VALUES": None,
-                            "_NULL_COUNTS": 0,
+                            "_MIN_VALUES": 
BinaryRowSerializer.to_bytes(file.value_stats.min_value),
+                            "_MAX_VALUES": 
BinaryRowSerializer.to_bytes(file.value_stats.max_value),
+                            "_NULL_COUNTS": file.value_stats.null_count,
                         },
-                        "_MIN_SEQUENCE_NUMBER": 0,
-                        "_MAX_SEQUENCE_NUMBER": 0,
-                        "_SCHEMA_ID": 0,
-                        "_LEVEL": 0,
-                        "_EXTRA_FILES": [],
+                        "_MIN_SEQUENCE_NUMBER": file.min_sequence_number,
+                        "_MAX_SEQUENCE_NUMBER": file.max_sequence_number,
+                        "_SCHEMA_ID": file.schema_id,
+                        "_LEVEL": file.level,
+                        "_EXTRA_FILES": file.extra_files,
+                        "_CREATION_TIME": file.creation_time,
+                        "_DELETE_ROW_COUNT": file.delete_row_count,
+                        "_EMBEDDED_FILE_INDEX": file.embedded_index,
+                        "_FILE_SOURCE": file.file_source,
                     }
                 }
                 avro_records.append(avro_record)
 
-        manifest_filename = f"manifest-{str(uuid.uuid4())}.avro"
-        manifest_path = self.manifest_path / manifest_filename
+        manifest_path = self.manifest_path / file_name
         try:
             buffer = BytesIO()
             fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records)
             avro_bytes = buffer.getvalue()
             with self.file_io.new_output_stream(manifest_path) as 
output_stream:
                 output_stream.write(avro_bytes)
-            return [str(manifest_filename)]
         except Exception as e:
             self.file_io.delete_quietly(manifest_path)
             raise RuntimeError(f"Failed to write manifest file: {e}") from e
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py 
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index a338441465..65fd2b21ac 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -16,15 +16,17 @@
 # limitations under the License.
 
################################################################################
 
-import uuid
 from io import BytesIO
-from typing import List, Optional
+from typing import List
 
 import fastavro
 
-from pypaimon.manifest.schema.manifest_file_meta import \
-    MANIFEST_FILE_META_SCHEMA
+from pypaimon.manifest.schema.manifest_file_meta import (
+    MANIFEST_FILE_META_SCHEMA, ManifestFileMeta)
+from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.table.row.binary_row import (BinaryRowDeserializer,
+                                           BinaryRowSerializer)
 
 
 class ManifestListManager:
@@ -37,57 +39,72 @@ class ManifestListManager:
         self.manifest_path = self.table.table_path / "manifest"
         self.file_io = self.table.file_io
 
-    def read_all_manifest_files(self, snapshot: Snapshot) -> List[str]:
+    def read_all(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
         manifest_files = []
         base_manifests = self.read(snapshot.base_manifest_list)
         manifest_files.extend(base_manifests)
         delta_manifests = self.read(snapshot.delta_manifest_list)
         manifest_files.extend(delta_manifests)
-        return list(set(manifest_files))
+        return manifest_files
 
-    def read(self, manifest_list_name: str) -> List[str]:
-        manifest_list_path = self.manifest_path / manifest_list_name
-        manifest_paths = []
+    def read(self, manifest_list_name: str) -> List[ManifestFileMeta]:
+        manifest_files = []
 
+        manifest_list_path = self.manifest_path / manifest_list_name
         with self.file_io.new_input_stream(manifest_list_path) as input_stream:
             avro_bytes = input_stream.read()
         buffer = BytesIO(avro_bytes)
         reader = fastavro.reader(buffer)
         for record in reader:
-            file_name = record['_FILE_NAME']
-            manifest_paths.append(file_name)
-
-        return manifest_paths
+            stats_dict = dict(record['_PARTITION_STATS'])
+            partition_stats = SimpleStats(
+                min_value=BinaryRowDeserializer.from_bytes(
+                    stats_dict['_MIN_VALUES'],
+                    self.table.table_schema.get_partition_key_fields()
+                ),
+                max_value=BinaryRowDeserializer.from_bytes(
+                    stats_dict['_MAX_VALUES'],
+                    self.table.table_schema.get_partition_key_fields()
+                ),
+                null_count=stats_dict['_NULL_COUNTS'],
+            )
+            manifest_file_meta = ManifestFileMeta(
+                file_name=record['_FILE_NAME'],
+                file_size=record['_FILE_SIZE'],
+                num_added_files=record['_NUM_ADDED_FILES'],
+                num_deleted_files=record['_NUM_DELETED_FILES'],
+                partition_stats=partition_stats,
+                schema_id=record['_SCHEMA_ID'],
+            )
+            manifest_files.append(manifest_file_meta)
 
-    def write(self, manifest_file_names: List[str]) -> Optional[str]:
-        if not manifest_file_names:
-            return None
+        return manifest_files
 
+    def write(self, file_name, manifest_file_metas: List[ManifestFileMeta]):
         avro_records = []
-        for manifest_file_name in manifest_file_names:
+        for meta in manifest_file_metas:
             avro_record = {
-                "_FILE_NAME": manifest_file_name,
-                "_FILE_SIZE": 0,  # TODO
-                "_NUM_ADDED_FILES": 0,
-                "_NUM_DELETED_FILES": 0,
+                "_VERSION": 2,
+                "_FILE_NAME": meta.file_name,
+                "_FILE_SIZE": meta.file_size,
+                "_NUM_ADDED_FILES": meta.num_added_files,
+                "_NUM_DELETED_FILES": meta.num_deleted_files,
                 "_PARTITION_STATS": {
-                    "_MIN_VALUES": None,
-                    "_MAX_VALUES": None,
-                    "_NULL_COUNTS": 0,
+                    "_MIN_VALUES": 
BinaryRowSerializer.to_bytes(meta.partition_stats.min_value),
+                    "_MAX_VALUES": 
BinaryRowSerializer.to_bytes(meta.partition_stats.max_value),
+                    "_NULL_COUNTS": meta.partition_stats.null_count,
                 },
-                "_SCHEMA_ID": 0,
+                "_SCHEMA_ID": meta.schema_id,
             }
             avro_records.append(avro_record)
 
-        list_filename = f"manifest-list-{str(uuid.uuid4())}.avro"
-        list_path = self.manifest_path / list_filename
+        list_path = self.manifest_path / file_name
         try:
             buffer = BytesIO()
             fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records)
             avro_bytes = buffer.getvalue()
             with self.file_io.new_output_stream(list_path) as output_stream:
                 output_stream.write(avro_bytes)
-            return list_filename
         except Exception as e:
             self.file_io.delete_quietly(list_path)
             raise RuntimeError(f"Failed to write manifest list file: {e}") 
from e
diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py 
b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
index 02a7179218..e1f60bf2e1 100644
--- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
@@ -19,7 +19,7 @@
 from dataclasses import dataclass
 from datetime import datetime
 from pathlib import Path
-from typing import List, Optional
+from typing import List
 
 from pypaimon.manifest.schema.simple_stats import (SIMPLE_STATS_SCHEMA,
                                                    SimpleStats)
@@ -31,34 +31,32 @@ class DataFileMeta:
     file_name: str
     file_size: int
     row_count: int
-    min_key: Optional[BinaryRow]
-    max_key: Optional[BinaryRow]
-    key_stats: Optional[SimpleStats]
-    value_stats: Optional[SimpleStats]
+    min_key: BinaryRow
+    max_key: BinaryRow
+    key_stats: SimpleStats
+    value_stats: SimpleStats
     min_sequence_number: int
     max_sequence_number: int
     schema_id: int
     level: int
-    extra_files: Optional[List[str]]
+    extra_files: List[str]
 
-    creation_time: Optional[datetime] = None
-    delete_row_count: Optional[int] = None
-    embedded_index: Optional[bytes] = None
-    file_source: Optional[str] = None
-    value_stats_cols: Optional[List[str]] = None
-    external_path: Optional[str] = None
+    creation_time: datetime | None = None
+    delete_row_count: int | None = None
+    embedded_index: bytes | None = None
+    file_source: str | None = None
+    value_stats_cols: List[str] | None = None
+    external_path: str | None = None
 
+    # not a schema field, just for internal usage
     file_path: str = None
 
     def set_file_path(self, table_path: Path, partition: BinaryRow, bucket: 
int):
         path_builder = table_path
-
         partition_dict = partition.to_dict()
         for field_name, field_value in partition_dict.items():
             path_builder = path_builder / (field_name + "=" + str(field_value))
-
         path_builder = path_builder / ("bucket-" + str(bucket)) / 
self.file_name
-
         self.file_path = str(path_builder)
 
 
@@ -78,11 +76,13 @@ DATA_FILE_META_SCHEMA = {
         {"name": "_SCHEMA_ID", "type": "long"},
         {"name": "_LEVEL", "type": "int"},
         {"name": "_EXTRA_FILES", "type": {"type": "array", "items": "string"}},
-        {"name": "_CREATION_TIME", "type": ["null", "long"], "default": None},
+        {"name": "_CREATION_TIME",
+         "type": [
+             "null",
+             {"type": "long", "logicalType": "timestamp-millis"}],
+         "default": None},
         {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default": 
None},
         {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default": 
None},
         {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None},
-        {"name": "_VALUE_STATS_COLS", "type": ["null", {"type": "array", 
"items": "string"}], "default": None},
-        {"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default": 
None},
     ]
 }
diff --git a/paimon-python/pypaimon/manifest/schema/manifest_entry.py 
b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
index 1a80553188..75a51f30c5 100644
--- a/paimon-python/pypaimon/manifest/schema/manifest_entry.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
@@ -36,6 +36,7 @@ MANIFEST_ENTRY_SCHEMA = {
     "type": "record",
     "name": "ManifestEntry",
     "fields": [
+        {"name": "_VERSION", "type": "int"},
         {"name": "_KIND", "type": "int"},
         {"name": "_PARTITION", "type": "bytes"},
         {"name": "_BUCKET", "type": "int"},
diff --git a/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py 
b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
index a001fca4c2..443c6a0944 100644
--- a/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
@@ -30,16 +30,13 @@ class ManifestFileMeta:
     num_deleted_files: int
     partition_stats: SimpleStats
     schema_id: int
-    min_bucket: int
-    max_bucket: int
-    min_level: int
-    max_level: int
 
 
 MANIFEST_FILE_META_SCHEMA = {
     "type": "record",
     "name": "ManifestFileMeta",
     "fields": [
+        {"name": "_VERSION", "type": "int"},
         {"name": "_FILE_NAME", "type": "string"},
         {"name": "_FILE_SIZE", "type": "long"},
         {"name": "_NUM_ADDED_FILES", "type": "long"},
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py 
b/paimon-python/pypaimon/manifest/schema/simple_stats.py
index b291c12852..dd6924fb2e 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py
@@ -17,6 +17,7 @@
 
################################################################################
 
 from dataclasses import dataclass
+from typing import List
 
 from pypaimon.table.row.binary_row import BinaryRow
 
@@ -25,15 +26,23 @@ from pypaimon.table.row.binary_row import BinaryRow
 class SimpleStats:
     min_value: BinaryRow
     max_value: BinaryRow
-    null_count: int
+    null_count: List[int] | None
 
 
 SIMPLE_STATS_SCHEMA = {
     "type": "record",
     "name": "SimpleStats",
     "fields": [
-        {"name": "_MIN_VALUES", "type": ["null", "bytes"], "default": None},
-        {"name": "_MAX_VALUES", "type": ["null", "bytes"], "default": None},
-        {"name": "_NULL_COUNTS", "type": ["null", "long"], "default": None},
+        {"name": "_MIN_VALUES", "type": "bytes"},
+        {"name": "_MAX_VALUES", "type": "bytes"},
+        {"name": "_NULL_COUNTS",
+         "type": [
+             "null",
+             {
+                 "type": "array",
+                 "items": ["null", "long"]
+             }
+         ],
+         "default": None},
     ]
 }
diff --git a/paimon-python/pypaimon/read/reader/data_file_record_reader.py 
b/paimon-python/pypaimon/read/reader/data_file_record_reader.py
index 41c1bf7517..c83f1ce152 100644
--- a/paimon-python/pypaimon/read/reader/data_file_record_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_record_reader.py
@@ -23,6 +23,7 @@ from pyarrow import RecordBatch
 
 from pypaimon.read.partition_info import PartitionInfo
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser
 
 
 class DataFileBatchReader(RecordBatchReader):
@@ -31,11 +32,12 @@ class DataFileBatchReader(RecordBatchReader):
     """
 
     def __init__(self, format_reader: RecordBatchReader, index_mapping: 
List[int], partition_info: PartitionInfo,
-                 system_primary_key: Optional[List[str]]):
+                 system_primary_key: Optional[List[str]], fields: 
List[DataField]):
         self.format_reader = format_reader
         self.index_mapping = index_mapping
         self.partition_info = partition_info
         self.system_primary_key = system_primary_key
+        self.schema_map = {field.name: field for field in 
PyarrowFieldParser.from_paimon_schema(fields)}
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         record_batch = self.format_reader.read_arrow_batch()
@@ -85,7 +87,17 @@ class DataFileBatchReader(RecordBatchReader):
             inter_arrays = mapped_arrays
             inter_names = mapped_names
 
-        return pa.RecordBatch.from_arrays(inter_arrays, names=inter_names)
+        # to contains 'not null' property
+        final_fields = []
+        for i, name in enumerate(inter_names):
+            array = inter_arrays[i]
+            target_field = self.schema_map.get(name)
+            if not target_field:
+                target_field = pa.field(name, array.type)
+            final_fields.append(target_field)
+        final_schema = pa.schema(final_fields)
+
+        return pa.RecordBatch.from_arrays(inter_arrays, schema=final_schema)
 
     def close(self) -> None:
         self.format_reader.close()
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index ea37593fdc..f085bac444 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -83,9 +83,11 @@ class SplitRead(ABC):
         index_mapping = self.create_index_mapping()
         partition_info = self.create_partition_info()
         if for_merge_read:
-            return DataFileBatchReader(format_reader, index_mapping, 
partition_info, self.trimmed_primary_key)
+            return DataFileBatchReader(format_reader, index_mapping, 
partition_info, self.trimmed_primary_key,
+                                       self.table.table_schema.fields)
         else:
-            return DataFileBatchReader(format_reader, index_mapping, 
partition_info, None)
+            return DataFileBatchReader(format_reader, index_mapping, 
partition_info, None,
+                                       self.table.table_schema.fields)
 
     @abstractmethod
     def _get_all_data_fields(self):
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index 2745b1d1b3..d89ddd0bcb 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -63,11 +63,11 @@ class TableScan:
         latest_snapshot = self.snapshot_manager.get_latest_snapshot()
         if not latest_snapshot:
             return Plan([])
-        manifest_files = 
self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
+        manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
 
         file_entries = []
-        for manifest_file_path in manifest_files:
-            manifest_entries = 
self.manifest_file_manager.read(manifest_file_path,
+        for manifest_file in manifest_files:
+            manifest_entries = 
self.manifest_file_manager.read(manifest_file.file_name,
                                                                lambda row: 
self._bucket_filter(row))
             for entry in manifest_entries:
                 if entry.kind == 0:
diff --git a/paimon-python/pypaimon/schema/data_types.py 
b/paimon-python/pypaimon/schema/data_types.py
index 8a22e7f012..a5186cc56c 100644
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -279,13 +279,16 @@ class DataTypeParser:
 
         if "(" in type_upper:
             base_type = type_upper.split("(")[0]
+        elif " " in type_upper:
+            base_type = type_upper.split(" ")[0]
+            type_upper = base_type
         else:
             base_type = type_upper
 
         try:
             Keyword(base_type)
             return AtomicType(
-                type_string, DataTypeParser.parse_nullability(type_string)
+                type_upper, DataTypeParser.parse_nullability(type_string)
             )
         except ValueError:
             raise Exception(f"Unknown type: {base_type}")
@@ -345,11 +348,7 @@ class DataTypeParser:
     def parse_data_field(
             json_data: Dict[str, Any], field_id: Optional[AtomicInteger] = None
     ) -> DataField:
-
-        if (
-                DataField.FIELD_ID in json_data
-                and json_data[DataField.FIELD_ID] is not None
-        ):
+        if DataField.FIELD_ID in json_data and json_data[DataField.FIELD_ID] 
is not None:
             if field_id is not None and field_id.get() != -1:
                 raise ValueError("Partial field id is not allowed.")
             field_id_value = int(json_data["id"])
@@ -486,7 +485,7 @@ class PyarrowFieldParser:
             return MapType(nullable, key_type, value_type)
         else:
             raise ValueError(f"Unknown type: {type_name}")
-        return AtomicType(type_name)
+        return AtomicType(type_name, nullable)
 
     @staticmethod
     def to_paimon_field(field_idx: int, pa_field: pyarrow.Field) -> DataField:
diff --git a/paimon-python/pypaimon/schema/schema_manager.py 
b/paimon-python/pypaimon/schema/schema_manager.py
index f0a108befb..f03b9e111b 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -19,6 +19,7 @@ from pathlib import Path
 from typing import Optional
 
 from pypaimon.common.file_io import FileIO
+from pypaimon.common.rest_json import JSON
 from pypaimon.schema.schema import Schema
 from pypaimon.schema.table_schema import TableSchema
 
@@ -42,15 +43,11 @@ class SchemaManager:
         except Exception as e:
             raise RuntimeError(f"Failed to load schema from path: 
{self.schema_path}") from e
 
-    def create_table(self, schema: Schema, external_table: bool = False) -> 
TableSchema:
+    def create_table(self, schema: Schema) -> TableSchema:
         while True:
             latest = self.latest()
             if latest is not None:
-                if external_table:
-                    self._check_schema_for_external_table(latest.to_schema(), 
schema)
-                    return latest
-                else:
-                    raise RuntimeError("Schema in filesystem exists, creation 
is not allowed.")
+                raise RuntimeError("Schema in filesystem exists, creation is 
not allowed.")
 
             table_schema = TableSchema.from_schema(schema_id=0, schema=schema)
             success = self.commit(table_schema)
@@ -60,7 +57,7 @@ class SchemaManager:
     def commit(self, new_schema: TableSchema) -> bool:
         schema_path = self._to_schema_path(new_schema.id)
         try:
-            return self.file_io.try_to_write_atomic(schema_path, 
new_schema.to_json())
+            return self.file_io.try_to_write_atomic(schema_path, 
JSON.to_json(new_schema, indent=2))
         except Exception as e:
             raise RuntimeError(f"Failed to commit schema: {e}") from e
 
diff --git a/paimon-python/pypaimon/schema/table_schema.py 
b/paimon-python/pypaimon/schema/table_schema.py
index 80b787eb6e..dc1872deb6 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -57,22 +57,6 @@ class TableSchema:
     comment: Optional[str] = json_field(FIELD_COMMENT, default=None)
     time_millis: int = json_field("timeMillis", default_factory=lambda: 
int(time.time() * 1000))
 
-    def __init__(self, version: int, id: int, fields: List[DataField], 
highest_field_id: int,
-                 partition_keys: List[str], primary_keys: List[str], options: 
Dict[str, str],
-                 comment: Optional[str] = None, time_millis: Optional[int] = 
None):
-        self.version = version
-        self.id = id
-        self.fields = fields
-        self.highest_field_id = highest_field_id
-        self.partition_keys = partition_keys or []
-        self.primary_keys = primary_keys or []
-        self.options = options or {}
-        self.comment = comment
-        self.time_millis = time_millis if time_millis is not None else 
int(time.time() * 1000)
-        self.get_trimmed_primary_key_fields()
-
-    from typing import List
-
     def cross_partition_update(self) -> bool:
         if not self.primary_keys or not self.partition_keys:
             return False
@@ -96,7 +80,7 @@ class TableSchema:
         partition_keys: List[str] = schema.partition_keys
         primary_keys: List[str] = schema.primary_keys
         options: Dict[str, str] = schema.options
-        highest_field_id: int = -1  # max(field.id for field in fields)
+        highest_field_id: int = max(field.id for field in fields)
 
         return TableSchema(
             TableSchema.CURRENT_VERSION,
@@ -106,8 +90,7 @@ class TableSchema:
             partition_keys,
             primary_keys,
             options,
-            schema.comment,
-            int(time.time())
+            schema.comment
         )
 
     @staticmethod
@@ -150,22 +133,6 @@ class TableSchema:
         except Exception as e:
             raise RuntimeError(f"Failed to parse schema from JSON: {e}") from e
 
-    def to_json(self) -> str:
-        data = {
-            TableSchema.FIELD_VERSION: self.version,
-            TableSchema.FIELD_ID: self.id,
-            TableSchema.FIELD_FIELDS: [field.to_dict() for field in 
self.fields],
-            TableSchema.FIELD_HIGHEST_FIELD_ID: self.highest_field_id,
-            TableSchema.FIELD_PARTITION_KEYS: self.partition_keys,
-            TableSchema.FIELD_PRIMARY_KEYS: self.primary_keys,
-            TableSchema.FIELD_OPTIONS: self.options,
-            TableSchema.FIELD_COMMENT: self.comment,
-            TableSchema.FIELD_TIME_MILLIS: self.time_millis
-        }
-        if self.comment is not None:
-            data["comment"] = self.comment
-        return json.dumps(data, indent=2, ensure_ascii=False)
-
     def copy(self, new_options: Optional[Dict[str, str]] = None) -> 
"TableSchema":
         return TableSchema(
             version=self.version,
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 1d131a492d..eb763d2938 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -49,10 +49,11 @@ class FileStoreTable(Table):
         self.primary_keys = table_schema.primary_keys
         self.partition_keys = table_schema.partition_keys
         self.options = table_schema.options
+        self.cross_partition_update = 
self.table_schema.cross_partition_update()
+        self.is_primary_key_table = bool(self.primary_keys)
+        self.total_buckets = int(table_schema.options.get(CoreOptions.BUCKET, 
-1))
 
         self.schema_manager = SchemaManager(file_io, table_path)
-        self.is_primary_key_table = bool(self.primary_keys)
-        self.cross_partition_update = 
self.table_schema.cross_partition_update()
 
     def current_branch(self) -> str:
         """Get the current branch name from options."""
diff --git a/paimon-python/pypaimon/table/row/binary_row.py 
b/paimon-python/pypaimon/table/row/binary_row.py
index 2e44ff417e..f1f8e740df 100644
--- a/paimon-python/pypaimon/table/row/binary_row.py
+++ b/paimon-python/pypaimon/table/row/binary_row.py
@@ -193,7 +193,7 @@ class BinaryRowDeserializer:
             return bytes_data[base_offset + sub_offset:base_offset + 
sub_offset + length]
         else:
             length = (offset_and_len & cls.HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56
-            return bytes_data[field_offset + 1:field_offset + 1 + length]
+            return bytes_data[field_offset:field_offset + length]
 
     @classmethod
     def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: 
int, data_type: DataType) -> Decimal:
@@ -238,8 +238,6 @@ class BinaryRowDeserializer:
 class BinaryRowSerializer:
     HEADER_SIZE_IN_BITS = 8
     MAX_FIX_PART_DATA_SIZE = 7
-    HIGHEST_FIRST_BIT = 0x80 << 56
-    HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7F << 56
 
     @classmethod
     def to_bytes(cls, binary_row: BinaryRow) -> bytes:
@@ -252,79 +250,49 @@ class BinaryRowSerializer:
         fixed_part = bytearray(fixed_part_size)
         fixed_part[0] = binary_row.row_kind.value
 
-        for i, value in enumerate(binary_row.values):
-            if value is None:
-                cls._set_null_bit(fixed_part, 0, i)
-
-        variable_data = []
-        variable_offsets = []
-        current_offset = fixed_part_size
+        variable_part_data = []
+        current_variable_offset = 0
 
         for i, (value, field) in enumerate(zip(binary_row.values, 
binary_row.fields)):
+            field_fixed_offset = null_bits_size_in_bytes + i * 8
+
             if value is None:
-                struct.pack_into('<q', fixed_part, null_bits_size_in_bytes + i 
* 8, 0)
-                variable_data.append(b'')
-                variable_offsets.append(0)
+                cls._set_null_bit(fixed_part, 0, i)
+                struct.pack_into('<q', fixed_part, field_fixed_offset, 0)
                 continue
 
-            field_offset = null_bits_size_in_bytes + i * 8
             if not isinstance(field.type, AtomicType):
                 raise ValueError(f"BinaryRow only support AtomicType yet, meet 
{field.type.__class__}")
-            if field.type.type.upper() in ['VARCHAR', 'STRING', 'CHAR', 
'BINARY', 'VARBINARY', 'BYTES']:
-                if field.type.type.upper() in ['VARCHAR', 'STRING', 'CHAR']:
-                    if isinstance(value, str):
-                        value_bytes = value.encode('utf-8')
-                    else:
-                        value_bytes = bytes(value)
+
+            type_name = field.type.type.upper()
+            if type_name in ['VARCHAR', 'STRING', 'CHAR', 'BINARY', 
'VARBINARY', 'BYTES']:
+                if type_name in ['VARCHAR', 'STRING', 'CHAR']:
+                    value_bytes = str(value).encode('utf-8')
                 else:
-                    if isinstance(value, bytes):
-                        value_bytes = value
-                    else:
-                        value_bytes = bytes(value)
+                    value_bytes = bytes(value)
 
                 length = len(value_bytes)
                 if length <= cls.MAX_FIX_PART_DATA_SIZE:
-                    fixed_part[field_offset:field_offset + length] = 
value_bytes
-                    for j in range(length, 8):
-                        fixed_part[field_offset + j] = 0
-                    packed_long = struct.unpack_from('<q', fixed_part, 
field_offset)[0]
-
-                    offset_and_len = packed_long | (length << 56) | 
cls.HIGHEST_FIRST_BIT
-                    if offset_and_len > 0x7FFFFFFFFFFFFFFF:
-                        offset_and_len = offset_and_len - 0x10000000000000000
-                    struct.pack_into('<q', fixed_part, field_offset, 
offset_and_len)
-                    variable_data.append(b'')
-                    variable_offsets.append(0)
+                    fixed_part[field_fixed_offset: field_fixed_offset + 
length] = value_bytes
+                    for j in range(length, 7):
+                        fixed_part[field_fixed_offset + j] = 0
+                    header_byte = 0x80 | length
+                    fixed_part[field_fixed_offset + 7] = header_byte
                 else:
-                    variable_data.append(value_bytes)
-                    variable_offsets.append(current_offset)
-                    current_offset += len(value_bytes)
-                    offset_and_len = (variable_offsets[i] << 32) | 
len(variable_data[i])
-                    struct.pack_into('<q', fixed_part, null_bits_size_in_bytes 
+ i * 8, offset_and_len)
-            else:
-                if field.type.type.upper() in ['BOOLEAN', 'BOOL']:
-                    struct.pack_into('<b', fixed_part, field_offset, 1 if 
value else 0)
-                elif field.type.type.upper() in ['TINYINT', 'BYTE']:
-                    struct.pack_into('<b', fixed_part, field_offset, value)
-                elif field.type.type.upper() in ['SMALLINT', 'SHORT']:
-                    struct.pack_into('<h', fixed_part, field_offset, value)
-                elif field.type.type.upper() in ['INT', 'INTEGER']:
-                    struct.pack_into('<i', fixed_part, field_offset, value)
-                elif field.type.type.upper() in ['BIGINT', 'LONG']:
-                    struct.pack_into('<q', fixed_part, field_offset, value)
-                elif field.type.type.upper() in ['FLOAT', 'REAL']:
-                    struct.pack_into('<f', fixed_part, field_offset, value)
-                elif field.type.type.upper() in ['DOUBLE']:
-                    struct.pack_into('<d', fixed_part, field_offset, value)
-                else:
-                    field_bytes = cls._serialize_field_value(value, field.type)
-                    fixed_part[field_offset:field_offset + len(field_bytes)] = 
field_bytes
+                    offset_in_variable_part = current_variable_offset
+                    variable_part_data.append(value_bytes)
+                    current_variable_offset += length
 
-                variable_data.append(b'')
-                variable_offsets.append(0)
+                    absolute_offset = fixed_part_size + offset_in_variable_part
+                    offset_and_len = (absolute_offset << 32) | length
+                    struct.pack_into('<q', fixed_part, field_fixed_offset, 
offset_and_len)
+            else:
+                field_bytes = cls._serialize_field_value(value, field.type)
+                fixed_part[field_fixed_offset: field_fixed_offset + 
len(field_bytes)] = field_bytes
 
-        result = bytes(fixed_part) + b''.join(variable_data)
-        return result
+        row_data = bytes(fixed_part) + b''.join(variable_part_data)
+        arity_prefix = struct.pack('>i', arity)
+        return arity_prefix + row_data
 
     @classmethod
     def _calculate_bit_set_width_in_bytes(cls, arity: int) -> int:
@@ -342,33 +310,29 @@ class BinaryRowSerializer:
         type_name = data_type.type.upper()
 
         if type_name in ['BOOLEAN', 'BOOL']:
-            return cls._serialize_boolean(value)
+            return cls._serialize_boolean(value) + b'\x00' * 7
         elif type_name in ['TINYINT', 'BYTE']:
-            return cls._serialize_byte(value)
+            return cls._serialize_byte(value) + b'\x00' * 7
         elif type_name in ['SMALLINT', 'SHORT']:
-            return cls._serialize_short(value)
+            return cls._serialize_short(value) + b'\x00' * 6
         elif type_name in ['INT', 'INTEGER']:
-            return cls._serialize_int(value)
+            return cls._serialize_int(value) + b'\x00' * 4
         elif type_name in ['BIGINT', 'LONG']:
             return cls._serialize_long(value)
         elif type_name in ['FLOAT', 'REAL']:
-            return cls._serialize_float(value)
+            return cls._serialize_float(value) + b'\x00' * 4
         elif type_name in ['DOUBLE']:
             return cls._serialize_double(value)
-        elif type_name in ['VARCHAR', 'STRING', 'CHAR']:
-            return cls._serialize_string(value)
-        elif type_name in ['BINARY', 'VARBINARY', 'BYTES']:
-            return cls._serialize_binary(value)
         elif type_name in ['DECIMAL', 'NUMERIC']:
             return cls._serialize_decimal(value, data_type)
         elif type_name in ['TIMESTAMP', 'TIMESTAMP_WITHOUT_TIME_ZONE']:
             return cls._serialize_timestamp(value)
         elif type_name in ['DATE']:
-            return cls._serialize_date(value)
+            return cls._serialize_date(value) + b'\x00' * 4
         elif type_name in ['TIME', 'TIME_WITHOUT_TIME_ZONE']:
-            return cls._serialize_time(value)
+            return cls._serialize_time(value) + b'\x00' * 4
         else:
-            return cls._serialize_string(str(value))
+            raise TypeError(f"Unsupported type for serialization: {type_name}")
 
     @classmethod
     def _serialize_boolean(cls, value: bool) -> bytes:
@@ -398,32 +362,6 @@ class BinaryRowSerializer:
     def _serialize_double(cls, value: float) -> bytes:
         return struct.pack('<d', value)
 
-    @classmethod
-    def _serialize_string(cls, value) -> bytes:
-        if isinstance(value, str):
-            value_bytes = value.encode('utf-8')
-        else:
-            value_bytes = bytes(value)
-
-        length = len(value_bytes)
-
-        offset_and_len = (0x80 << 56) | (length << 56)
-        if offset_and_len > 0x7FFFFFFFFFFFFFFF:
-            offset_and_len = offset_and_len - 0x10000000000000000
-        return struct.pack('<q', offset_and_len)
-
-    @classmethod
-    def _serialize_binary(cls, value: bytes) -> bytes:
-        if isinstance(value, bytes):
-            data_bytes = value
-        else:
-            data_bytes = bytes(value)
-        length = len(data_bytes)
-        offset_and_len = (0x80 << 56) | (length << 56)
-        if offset_and_len > 0x7FFFFFFFFFFFFFFF:
-            offset_and_len = offset_and_len - 0x10000000000000000
-        return struct.pack('<q', offset_and_len)
-
     @classmethod
     def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes:
         type_str = str(data_type)
@@ -452,11 +390,10 @@ class BinaryRowSerializer:
     @classmethod
     def _serialize_date(cls, value: datetime) -> bytes:
         if isinstance(value, datetime):
-            epoch = datetime(1970, 1, 1)
-            days = (value - epoch).days
+            epoch = datetime(1970, 1, 1).date()
+            days = (value.date() - epoch).days
         else:
-            epoch = datetime(1970, 1, 1)
-            days = (value - epoch).days
+            raise RuntimeError("date should be datatime")
         return struct.pack('<i', days)
 
     @classmethod
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py 
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index 4671e32996..b9b115dfa4 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -38,10 +38,10 @@ class PkReaderTest(unittest.TestCase):
         cls.catalog.create_database('default', False)
 
         cls.pa_schema = pa.schema([
-            ('user_id', pa.int32()),
+            pa.field('user_id', pa.int32(), nullable=False),
             ('item_id', pa.int64()),
             ('behavior', pa.string()),
-            ('dt', pa.string())
+            pa.field('dt', pa.string(), nullable=False)
         ])
         cls.expected = pa.Table.from_pydict({
             'user_id': [1, 2, 3, 4, 5, 7, 8],
diff --git a/paimon-python/pypaimon/tests/rest_catalog_base_test.py 
b/paimon-python/pypaimon/tests/rest_catalog_base_test.py
index 773d94abed..c035957ccc 100644
--- a/paimon-python/pypaimon/tests/rest_catalog_base_test.py
+++ b/paimon-python/pypaimon/tests/rest_catalog_base_test.py
@@ -186,7 +186,7 @@ class RESTCatalogBaseTest(unittest.TestCase):
         self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_table/snapshot/snapshot-1"))
         self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_table/manifest"))
         self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_table/dt=p1"))
-        self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_table/manifest/*.avro")), 2)
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_table/manifest/*")), 3)
 
     def _write_test_table(self, table):
         write_builder = table.new_batch_write_builder()
diff --git a/paimon-python/pypaimon/tests/rest_server.py 
b/paimon-python/pypaimon/tests/rest_server.py
index d0486ec02c..c326f3525d 100644
--- a/paimon-python/pypaimon/tests/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest_server.py
@@ -495,8 +495,8 @@ class RESTCatalogServer:
 
     def _write_snapshot_files(self, identifier: Identifier, snapshot, 
statistics):
         """Write snapshot and related files to the file system"""
-        import os
         import json
+        import os
         import uuid
 
         # Construct table path: {warehouse}/{database}/{table}
diff --git a/paimon-python/pypaimon/tests/rest_table_test.py 
b/paimon-python/pypaimon/tests/rest_table_test.py
index 9c64c17003..b905fa593b 100644
--- a/paimon-python/pypaimon/tests/rest_table_test.py
+++ b/paimon-python/pypaimon/tests/rest_table_test.py
@@ -168,7 +168,7 @@ class RESTTableTest(RESTCatalogBaseTest):
         self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_postpone/snapshot/LATEST"))
         self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_postpone/snapshot/snapshot-1"))
         self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_postpone/manifest"))
-        self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_postpone/manifest/*.avro")), 2)
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_postpone/manifest/*")), 3)
         self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_postpone/user_id=2/bucket-postpone/*.avro")), 1)
 
     def test_postpone_read_write(self):
diff --git a/paimon-python/pypaimon/tests/schema_test.py 
b/paimon-python/pypaimon/tests/schema_test.py
index daa9442e50..766676e95f 100644
--- a/paimon-python/pypaimon/tests/schema_test.py
+++ b/paimon-python/pypaimon/tests/schema_test.py
@@ -32,7 +32,8 @@ class SchemaTestCase(unittest.TestCase):
             DataField(0, "name", AtomicType('INT'), 'desc  name'),
             DataField(1, "arr", ArrayType(True, AtomicType('INT')), 'desc 
arr1'),
             DataField(2, "map1",
-                      MapType(False, AtomicType('INT'), MapType(False, 
AtomicType('INT'), AtomicType('INT'))),
+                      MapType(False, AtomicType('INT', False),
+                              MapType(False, AtomicType('INT', False), 
AtomicType('INT', False))),
                       'desc map1'),
         ]
         table_schema = TableSchema(TableSchema.CURRENT_VERSION, 
len(data_fields), data_fields,
diff --git a/paimon-python/pypaimon/tests/writer_test.py 
b/paimon-python/pypaimon/tests/writer_test.py
index c0c242d2c4..8bf38f72e0 100644
--- a/paimon-python/pypaimon/tests/writer_test.py
+++ b/paimon-python/pypaimon/tests/writer_test.py
@@ -71,7 +71,7 @@ class WriterTest(unittest.TestCase):
         self.assertTrue(os.path.exists(self.warehouse + 
"/test_db.db/test_table/snapshot/snapshot-1"))
         self.assertTrue(os.path.exists(self.warehouse + 
"/test_db.db/test_table/manifest"))
         self.assertTrue(os.path.exists(self.warehouse + 
"/test_db.db/test_table/bucket-0"))
-        self.assertEqual(len(glob.glob(self.warehouse + 
"/test_db.db/test_table/manifest/*.avro")), 2)
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/test_db.db/test_table/manifest/*")), 3)
         self.assertEqual(len(glob.glob(self.warehouse + 
"/test_db.db/test_table/bucket-0/*.parquet")), 1)
 
         with open(self.warehouse + 
'/test_db.db/test_table/snapshot/snapshot-1', 'r', encoding='utf-8') as file:
diff --git a/paimon-python/pypaimon/write/commit_message.py 
b/paimon-python/pypaimon/write/commit_message.py
index 1a2177abb1..4e4c0f0b48 100644
--- a/paimon-python/pypaimon/write/commit_message.py
+++ b/paimon-python/pypaimon/write/commit_message.py
@@ -15,31 +15,17 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
+from dataclasses import dataclass
 from typing import List, Tuple
 
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 
 
+@dataclass
 class CommitMessage:
-    """Python implementation of CommitMessage"""
-
-    def __init__(self, partition: Tuple, bucket: int, new_files: 
List[DataFileMeta]):
-        self._partition = partition
-        self._bucket = bucket
-        self._new_files = new_files or []
-
-    def partition(self) -> Tuple:
-        """Get the partition of this commit message."""
-        return self._partition
-
-    def bucket(self) -> int:
-        """Get the bucket of this commit message."""
-        return self._bucket
-
-    def new_files(self) -> List[DataFileMeta]:
-        """Get the list of new files."""
-        return self._new_files
+    partition: Tuple
+    bucket: int
+    new_files: List[DataFileMeta]
 
     def is_empty(self):
-        return not self._new_files
+        return not self.new_files
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index e98cb8610f..ca26981333 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -17,14 +17,18 @@
 
################################################################################
 
 import time
+import uuid
 from pathlib import Path
 from typing import List
 
 from pypaimon.catalog.snapshot_commit import PartitionStatistics, 
SnapshotCommit
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
+from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.row.binary_row import BinaryRow
 from pypaimon.write.commit_message import CommitMessage
 
 
@@ -55,39 +59,69 @@ class FileStoreCommit:
         if not commit_messages:
             return
 
-        new_manifest_files = self.manifest_file_manager.write(commit_messages)
-        if not new_manifest_files:
-            return
+        unique_id = uuid.uuid4()
+        base_manifest_list = f"manifest-list-{unique_id}-0"
+        delta_manifest_list = f"manifest-list-{unique_id}-1"
+
+        # process new_manifest
+        new_manifest_file = f"manifest-{str(uuid.uuid4())}-0"
+        self.manifest_file_manager.write(new_manifest_file, commit_messages)
+
+        partition_columns = list(zip(*(msg.partition for msg in 
commit_messages)))
+        partition_min_stats = [min(col) for col in partition_columns]
+        partition_max_stats = [max(col) for col in partition_columns]
+        partition_null_counts = [sum(value == 0 for value in col) for col in 
partition_columns]
+        if not all(count == 0 for count in partition_null_counts):
+            raise RuntimeError("Partition value should not be null")
+
+        new_manifest_list = ManifestFileMeta(
+            file_name=new_manifest_file,
+            
file_size=self.table.file_io.get_file_size(self.manifest_file_manager.manifest_path
 / new_manifest_file),
+            num_added_files=sum(len(msg.new_files) for msg in commit_messages),
+            num_deleted_files=0,
+            partition_stats=SimpleStats(
+                min_value=BinaryRow(
+                    values=partition_min_stats,
+                    fields=self.table.table_schema.get_partition_key_fields(),
+                ),
+                max_value=BinaryRow(
+                    values=partition_max_stats,
+                    fields=self.table.table_schema.get_partition_key_fields(),
+                ),
+                null_count=partition_null_counts,
+            ),
+            schema_id=self.table.table_schema.id,
+        )
+        self.manifest_list_manager.write(delta_manifest_list, 
[new_manifest_list])
 
+        # process existing_manifest
         latest_snapshot = self.snapshot_manager.get_latest_snapshot()
-
-        existing_manifest_files = []
-        record_count_add = self._generate_record_count_add(commit_messages)
-        total_record_count = record_count_add
-
+        total_record_count = 0
         if latest_snapshot:
-            existing_manifest_files = 
self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
+            existing_manifest_files = 
self.manifest_list_manager.read_all(latest_snapshot)
             previous_record_count = latest_snapshot.total_record_count
             if previous_record_count:
                 total_record_count += previous_record_count
+        else:
+            existing_manifest_files = []
+        self.manifest_list_manager.write(base_manifest_list, 
existing_manifest_files)
 
-        new_manifest_files.extend(existing_manifest_files)
-        manifest_list = self.manifest_list_manager.write(new_manifest_files)
-
+        # process snapshot
         new_snapshot_id = self._generate_snapshot_id()
+        record_count_add = self._generate_record_count_add(commit_messages)
+        total_record_count += record_count_add
         snapshot_data = Snapshot(
-            version=3,
+            version=1,
             id=new_snapshot_id,
-            schema_id=0,
-            base_manifest_list=manifest_list,
-            delta_manifest_list=manifest_list,
+            schema_id=self.table.table_schema.id,
+            base_manifest_list=base_manifest_list,
+            delta_manifest_list=delta_manifest_list,
             total_record_count=total_record_count,
             delta_record_count=record_count_add,
             commit_user=self.commit_user,
             commit_identifier=commit_identifier,
             commit_kind="APPEND",
             time_millis=int(time.time() * 1000),
-            log_offsets={},
         )
 
         # Generate partition statistics for the commit
@@ -101,46 +135,11 @@ class FileStoreCommit:
 
     def overwrite(self, partition, commit_messages: List[CommitMessage], 
commit_identifier: int):
         """Commit the given commit messages in overwrite mode."""
-        if not commit_messages:
-            return
-
-        new_manifest_files = self.manifest_file_manager.write(commit_messages)
-        if not new_manifest_files:
-            return
-
-        # In overwrite mode, we don't merge with existing manifests
-        manifest_list = self.manifest_list_manager.write(new_manifest_files)
-
-        record_count_add = self._generate_record_count_add(commit_messages)
-
-        new_snapshot_id = self._generate_snapshot_id()
-        snapshot_data = Snapshot(
-            version=3,
-            id=new_snapshot_id,
-            schema_id=0,
-            base_manifest_list=manifest_list,
-            delta_manifest_list=manifest_list,
-            total_record_count=record_count_add,
-            delta_record_count=record_count_add,
-            commit_user=self.commit_user,
-            commit_identifier=commit_identifier,
-            commit_kind="OVERWRITE",
-            time_millis=int(time.time() * 1000),
-            log_offsets={},
-        )
-
-        # Generate partition statistics for the commit
-        statistics = self._generate_partition_statistics(commit_messages)
-
-        # Use SnapshotCommit for atomic commit
-        with self.snapshot_commit:
-            success = self.snapshot_commit.commit(snapshot_data, 
self.table.current_branch(), statistics)
-            if not success:
-                raise RuntimeError(f"Failed to commit snapshot 
{new_snapshot_id}")
+        raise RuntimeError("overwrite unsupported yet")
 
     def abort(self, commit_messages: List[CommitMessage]):
         for message in commit_messages:
-            for file in message.new_files():
+            for file in message.new_files:
                 try:
                     file_path_obj = Path(file.file_path)
                     if file_path_obj.exists():
@@ -179,7 +178,7 @@ class FileStoreCommit:
 
         for message in commit_messages:
             # Convert partition tuple to dictionary for PartitionStatistics
-            partition_value = message.partition()  # Call the method to get 
partition value
+            partition_value = message.partition  # Call the method to get 
partition value
             if partition_value:
                 # Assuming partition is a tuple and we need to convert it to a 
dict
                 # This may need adjustment based on actual partition format
@@ -213,8 +212,7 @@ class FileStoreCommit:
 
             # Process each file in the commit message
             # Following Java implementation: PartitionEntry.fromDataFile()
-            new_files = message.new_files()
-            for file_meta in new_files:
+            for file_meta in message.new_files:
                 # Extract actual file metadata (following Java DataFileMeta 
pattern)
                 record_count = file_meta.row_count
                 file_size_in_bytes = file_meta.file_size
@@ -266,7 +264,7 @@ class FileStoreCommit:
         record_count = 0
 
         for message in commit_messages:
-            new_files = message.new_files()
+            new_files = message.new_files
             for file_meta in new_files:
                 record_count += file_meta.row_count
 
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index 3bf6150b03..bcef10a4c3 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -15,7 +15,6 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
 from typing import Dict, List, Tuple
 
 import pyarrow as pa
@@ -34,6 +33,7 @@ class FileStoreWrite:
 
         self.table: FileStoreTable = table
         self.data_writers: Dict[Tuple, DataWriter] = {}
+        self.max_seq_numbers = self._seq_number_stats()  # TODO: build this 
on-demand instead of on all
 
     def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
         key = (partition, bucket)
@@ -48,12 +48,14 @@ class FileStoreWrite:
                 table=self.table,
                 partition=partition,
                 bucket=bucket,
+                max_seq_number=self.max_seq_numbers.get((partition, bucket), 
1),
             )
         else:
             return AppendOnlyDataWriter(
                 table=self.table,
                 partition=partition,
                 bucket=bucket,
+                max_seq_number=self.max_seq_numbers.get((partition, bucket), 
1),
             )
 
     def prepare_commit(self) -> List[CommitMessage]:
@@ -74,3 +76,33 @@ class FileStoreWrite:
         for writer in self.data_writers.values():
             writer.close()
         self.data_writers.clear()
+
+    def _seq_number_stats(self) -> dict:
+        from pypaimon.manifest.manifest_file_manager import ManifestFileManager
+        from pypaimon.manifest.manifest_list_manager import ManifestListManager
+        from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+        snapshot_manager = SnapshotManager(self.table)
+        manifest_list_manager = ManifestListManager(self.table)
+        manifest_file_manager = ManifestFileManager(self.table)
+
+        latest_snapshot = snapshot_manager.get_latest_snapshot()
+        if not latest_snapshot:
+            return {}
+        manifest_files = manifest_list_manager.read_all(latest_snapshot)
+
+        file_entries = []
+        for manifest_file in manifest_files:
+            manifest_entries = 
manifest_file_manager.read(manifest_file.file_name)
+            for entry in manifest_entries:
+                if entry.kind == 0:
+                    file_entries.append(entry)
+
+        max_seq_numbers = {}
+        for entry in file_entries:
+            partition_key = (tuple(entry.partition.values), entry.bucket)
+            current_seq_num = entry.file.max_sequence_number
+            existing_max = max_seq_numbers.get(partition_key, -1)
+            if current_seq_num > existing_max:
+                max_seq_numbers[partition_key] = current_seq_num
+        return max_seq_numbers
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index c11d991b84..5d9641718c 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -15,16 +15,18 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
 import uuid
 from abc import ABC, abstractmethod
+from datetime import datetime
 from pathlib import Path
 from typing import List, Optional, Tuple
 
 import pyarrow as pa
+import pyarrow.compute as pc
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.table.bucket_mode import BucketMode
 from pypaimon.table.row.binary_row import BinaryRow
 
@@ -32,7 +34,7 @@ from pypaimon.table.row.binary_row import BinaryRow
 class DataWriter(ABC):
     """Base class for data writers that handle PyArrow tables directly."""
 
-    def __init__(self, table, partition: Tuple, bucket: int):
+    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
@@ -50,6 +52,7 @@ class DataWriter(ABC):
                                        if self.bucket != 
BucketMode.POSTPONE_BUCKET.value
                                        else CoreOptions.FILE_FORMAT_AVRO)
         self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
+        self.sequence_generator = SequenceGenerator(max_seq_number)
 
         self.pending_data: Optional[pa.RecordBatch] = None
         self.committed_files: List[DataFileMeta] = []
@@ -89,7 +92,7 @@ class DataWriter(ABC):
 
         current_size = self.pending_data.get_total_buffer_size()
         if current_size > self.target_file_size:
-            split_row = _find_optimal_split_point(self.pending_data, 
self.target_file_size)
+            split_row = self._find_optimal_split_point(self.pending_data, 
self.target_file_size)
             if split_row > 0:
                 data_to_write = self.pending_data.slice(0, split_row)
                 remaining_data = self.pending_data.slice(split_row)
@@ -101,7 +104,7 @@ class DataWriter(ABC):
     def _write_data_to_file(self, data: pa.RecordBatch):
         if data.num_rows == 0:
             return
-        file_name = f"data-{uuid.uuid4()}.{self.file_format}"
+        file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
         file_path = self._generate_file_path(file_name)
         if self.file_format == CoreOptions.FILE_FORMAT_PARQUET:
             self.file_io.write_parquet(file_path, data, 
compression=self.compression)
@@ -112,24 +115,55 @@ class DataWriter(ABC):
         else:
             raise ValueError(f"Unsupported file format: {self.file_format}")
 
+        # min key & max key
         key_columns_batch = data.select(self.trimmed_primary_key)
         min_key_row_batch = key_columns_batch.slice(0, 1)
-        min_key_data = [col.to_pylist()[0] for col in 
min_key_row_batch.columns]
         max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows 
- 1, 1)
-        max_key_data = [col.to_pylist()[0] for col in 
max_key_row_batch.columns]
+        min_key = [col.to_pylist()[0] for col in min_key_row_batch.columns]
+        max_key = [col.to_pylist()[0] for col in max_key_row_batch.columns]
+
+        # key stats & value stats
+        column_stats = {
+            field.name: self._get_column_stats(data, field.name)
+            for field in self.table.table_schema.fields
+        }
+        all_fields = self.table.table_schema.fields
+        min_value_stats = [column_stats[field.name]['min_value'] for field in 
all_fields]
+        max_value_stats = [column_stats[field.name]['max_value'] for field in 
all_fields]
+        value_null_counts = [column_stats[field.name]['null_count'] for field 
in all_fields]
+        key_fields = self.trimmed_primary_key_fields
+        min_key_stats = [column_stats[field.name]['min_value'] for field in 
key_fields]
+        max_key_stats = [column_stats[field.name]['max_value'] for field in 
key_fields]
+        key_null_counts = [column_stats[field.name]['null_count'] for field in 
key_fields]
+        if not all(count == 0 for count in key_null_counts):
+            raise RuntimeError("Primary key should not be null")
+
+        min_seq = self.sequence_generator.start
+        max_seq = self.sequence_generator.current
+        self.sequence_generator.start = self.sequence_generator.current
         self.committed_files.append(DataFileMeta(
             file_name=file_name,
             file_size=self.file_io.get_file_size(file_path),
             row_count=data.num_rows,
-            min_key=BinaryRow(min_key_data, self.trimmed_primary_key_fields),
-            max_key=BinaryRow(max_key_data, self.trimmed_primary_key_fields),
-            key_stats=None,  # TODO
-            value_stats=None,
-            min_sequence_number=0,
-            max_sequence_number=0,
-            schema_id=0,
+            min_key=BinaryRow(min_key, self.trimmed_primary_key_fields),
+            max_key=BinaryRow(max_key, self.trimmed_primary_key_fields),
+            key_stats=SimpleStats(
+                BinaryRow(min_key_stats, self.trimmed_primary_key_fields),
+                BinaryRow(max_key_stats, self.trimmed_primary_key_fields),
+                key_null_counts,
+            ),
+            value_stats=SimpleStats(
+                BinaryRow(min_value_stats, self.table.table_schema.fields),
+                BinaryRow(max_value_stats, self.table.table_schema.fields),
+                value_null_counts,
+            ),
+            min_sequence_number=min_seq,
+            max_sequence_number=max_seq,
+            schema_id=self.table.table_schema.id,
             level=0,
-            extra_files=None,
+            extra_files=[],
+            creation_time=datetime.now(),
+            delete_row_count=0,
             file_path=str(file_path),
         ))
 
@@ -146,24 +180,52 @@ class DataWriter(ABC):
 
         return path_builder
 
-
-def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int:
-    total_rows = data.num_rows
-    if total_rows <= 1:
-        return 0
-
-    left, right = 1, total_rows
-    best_split = 0
-
-    while left <= right:
-        mid = (left + right) // 2
-        slice_data = data.slice(0, mid)
-        slice_size = slice_data.get_total_buffer_size()
-
-        if slice_size <= target_size:
-            best_split = mid
-            left = mid + 1
-        else:
-            right = mid - 1
-
-    return best_split
+    @staticmethod
+    def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> 
int:
+        total_rows = data.num_rows
+        if total_rows <= 1:
+            return 0
+
+        left, right = 1, total_rows
+        best_split = 0
+
+        while left <= right:
+            mid = (left + right) // 2
+            slice_data = data.slice(0, mid)
+            slice_size = slice_data.get_total_buffer_size()
+
+            if slice_size <= target_size:
+                best_split = mid
+                left = mid + 1
+            else:
+                right = mid - 1
+
+        return best_split
+
+    @staticmethod
+    def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> 
dict:
+        column_array = record_batch.column(column_name)
+        if column_array.null_count == len(column_array):
+            return {
+                "min_value": None,
+                "max_value": None,
+                "null_count": column_array.null_count,
+            }
+        min_value = pc.min(column_array).as_py()
+        max_value = pc.max(column_array).as_py()
+        null_count = column_array.null_count
+        return {
+            "min_value": min_value,
+            "max_value": max_value,
+            "null_count": null_count,
+        }
+
+
+class SequenceGenerator:
+    def __init__(self, start: int = 0):
+        self.start = start
+        self.current = start
+
+    def next(self) -> int:
+        self.current += 1
+        return self.current
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py 
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index dbf47496c9..99b11a9788 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -22,18 +22,6 @@ import pyarrow.compute as pc
 from pypaimon.write.writer.data_writer import DataWriter
 
 
-class SequenceGenerator:
-    def __init__(self, start: int = 0):
-        self.current = start
-
-    def next(self) -> int:
-        self.current += 1
-        return self.current
-
-
-sequence_generator = SequenceGenerator()
-
-
 class KeyValueDataWriter(DataWriter):
     """Data writer for primary key tables with system fields and sorting."""
 
@@ -55,11 +43,11 @@ class KeyValueDataWriter(DataWriter):
                 key_column = data.column(pk_key)
                 enhanced_table = enhanced_table.add_column(0, 
f'_KEY_{pk_key}', key_column)
 
-        sequence_column = pa.array([sequence_generator.next() for _ in 
range(num_rows)], type=pa.int64())
+        sequence_column = pa.array([self.sequence_generator.next() for _ in 
range(num_rows)], type=pa.int64())
         enhanced_table = 
enhanced_table.add_column(len(self.trimmed_primary_key), '_SEQUENCE_NUMBER', 
sequence_column)
 
         # TODO: support real row kind here
-        value_kind_column = pa.repeat(0, num_rows)
+        value_kind_column = pa.array([0] * num_rows, type=pa.int32())
         enhanced_table = 
enhanced_table.add_column(len(self.trimmed_primary_key) + 1, '_VALUE_KIND',
                                                    value_kind_column)
 
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 507844685c..a00a2ac487 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -28,7 +28,8 @@ install_requires = [
     'ossfs==2023.12.0',
     'pyarrow==16.0.0',
     'polars==1.32.0',
-    'fastavro==1.11.1'
+    'fastavro==1.11.1',
+    'zstandard==0.24.0'
 ]
 
 long_description = "See Apache Paimon Python API \

Reply via email to