This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 32cb6854db [Python] Enhanced Manifest Statistics & Hybrid Java/Python 
Writer Tests (#6119)
32cb6854db is described below

commit 32cb6854dbe33af86cf85ca47ecb11309dd032fd
Author: ChengHui Chen <[email protected]>
AuthorDate: Wed Aug 27 11:36:21 2025 +0800

    [Python] Enhanced Manifest Statistics & Hybrid Java/Python Writer Tests 
(#6119)
---
 .github/workflows/paimon-python-checks.yml         |   2 +-
 .../pypaimon/catalog/filesystem_catalog.py         |   4 +-
 .../pypaimon/catalog/renaming_snapshot_commit.py   |   2 +-
 .../pypaimon/manifest/manifest_file_manager.py     |  92 +++--
 .../pypaimon/manifest/manifest_list_manager.py     |  73 ++--
 .../pypaimon/manifest/schema/data_file_meta.py     |  36 +-
 .../pypaimon/manifest/schema/manifest_entry.py     |   1 +
 .../pypaimon/manifest/schema/manifest_file_meta.py |   5 +-
 .../pypaimon/manifest/schema/simple_stats.py       |  17 +-
 .../read/reader/data_file_record_reader.py         |  16 +-
 paimon-python/pypaimon/read/split_read.py          |   6 +-
 paimon-python/pypaimon/read/table_scan.py          |   6 +-
 paimon-python/pypaimon/schema/data_types.py        |  13 +-
 paimon-python/pypaimon/schema/schema_manager.py    |  11 +-
 paimon-python/pypaimon/schema/table_schema.py      |  37 +-
 paimon-python/pypaimon/table/file_store_table.py   |   5 +-
 paimon-python/pypaimon/table/row/binary_row.py     | 145 +++-----
 .../pypaimon/tests/java_python_hybrid_test.py      | 132 +++++++
 .../py4j_impl/__init__.py}                         |  22 --
 .../py4j_impl/constants.py}                        |  29 +-
 .../pypaimon/tests/py4j_impl/gateway_factory.py    | 135 ++++++++
 .../pypaimon/tests/py4j_impl/gateway_server.py     | 122 +++++++
 .../tests/py4j_impl/java_implementation.py         | 383 +++++++++++++++++++++
 .../pypaimon/tests/py4j_impl/java_utils.py         | 132 +++++++
 .../flink-shaded-hadoop-2-uber-2.8.3-10.0.jar      | Bin 0 -> 43317025 bytes
 .../paimon-python-java-bridge-0.9-SNAPSHOT.jar     | Bin 0 -> 43384210 bytes
 .../pypaimon/tests/reader_primary_key_test.py      |   4 +-
 .../pypaimon/tests/rest_catalog_base_test.py       |   2 +-
 paimon-python/pypaimon/tests/rest_server.py        |   2 +-
 paimon-python/pypaimon/tests/rest_table_test.py    |   2 +-
 paimon-python/pypaimon/tests/schema_test.py        |   3 +-
 paimon-python/pypaimon/tests/writer_test.py        |   2 +-
 paimon-python/pypaimon/write/commit_message.py     |  26 +-
 paimon-python/pypaimon/write/file_store_commit.py  | 114 +++---
 paimon-python/pypaimon/write/file_store_write.py   |  34 +-
 paimon-python/pypaimon/write/writer/data_writer.py | 132 +++++--
 .../pypaimon/write/writer/key_value_data_writer.py |  16 +-
 paimon-python/setup.py                             |   3 +-
 38 files changed, 1334 insertions(+), 432 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 9a38e9b5c2..6f6a8a23a8 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -46,7 +46,7 @@ jobs:
           python-version: ${{ env.PYTHON_VERSION }}
       - name: Install dependencies
         run: |
-          python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 
polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 flake8==4.0.1 
pytest~=7.0 requests 2>&1 >/dev/null
+          python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 
flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests 2>&1 >/dev/null
       - name: Run lint-python.sh
         run: |
           chmod +x paimon-python/dev/lint-python.sh
diff --git a/paimon-python/pypaimon/catalog/filesystem_catalog.py 
b/paimon-python/pypaimon/catalog/filesystem_catalog.py
index 68d1e438c8..1aed6e8137 100644
--- a/paimon-python/pypaimon/catalog/filesystem_catalog.py
+++ b/paimon-python/pypaimon/catalog/filesystem_catalog.py
@@ -17,7 +17,7 @@
 
#################################################################################
 
 from pathlib import Path
-from typing import Optional, Union, List
+from typing import List, Optional, Union
 from urllib.parse import urlparse
 
 from pypaimon.catalog.catalog import Catalog
@@ -67,7 +67,7 @@ class FileSystemCatalog(Catalog):
         if not isinstance(identifier, Identifier):
             identifier = Identifier.from_string(identifier)
         if CoreOptions.SCAN_FALLBACK_BRANCH in self.catalog_options:
-            raise ValueError(CoreOptions.SCAN_FALLBACK_BRANCH)
+            raise ValueError(f"Unsupported CoreOption 
{CoreOptions.SCAN_FALLBACK_BRANCH}")
         table_path = self.get_table_path(identifier)
         table_schema = self.get_table_schema(identifier)
 
diff --git a/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py 
b/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
index 4a2e70e4e2..b85c4200a1 100644
--- a/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
+++ b/paimon-python/pypaimon/catalog/renaming_snapshot_commit.py
@@ -63,7 +63,7 @@ class RenamingSnapshotCommit(SnapshotCommit):
         if not self.file_io.exists(new_snapshot_path):
             """Internal function to perform the actual commit."""
             # Try to write atomically using the file IO
-            committed = self.file_io.try_to_write_atomic(new_snapshot_path, 
JSON.to_json(snapshot))
+            committed = self.file_io.try_to_write_atomic(new_snapshot_path, 
JSON.to_json(snapshot, indent=2))
             if committed:
                 # Update the latest hint
                 self._commit_latest_hint(snapshot.id)
diff --git a/paimon-python/pypaimon/manifest/manifest_file_manager.py 
b/paimon-python/pypaimon/manifest/manifest_file_manager.py
index ef674e5b88..7c46b368d2 100644
--- a/paimon-python/pypaimon/manifest/manifest_file_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_file_manager.py
@@ -15,7 +15,6 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-import uuid
 from io import BytesIO
 from typing import List
 
@@ -24,8 +23,10 @@ import fastavro
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 from pypaimon.manifest.schema.manifest_entry import (MANIFEST_ENTRY_SCHEMA,
                                                      ManifestEntry)
+from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.table.row.binary_row import (BinaryRow, BinaryRowDeserializer,
                                            BinaryRowSerializer)
+from pypaimon.write.commit_message import CommitMessage
 
 
 class ManifestFileManager:
@@ -51,20 +52,40 @@ class ManifestFileManager:
         reader = fastavro.reader(buffer)
 
         for record in reader:
-            file_info = dict(record['_FILE'])
+            file_dict = dict(record['_FILE'])
+            key_dict = dict(file_dict['_KEY_STATS'])
+            key_stats = SimpleStats(
+                
min_value=BinaryRowDeserializer.from_bytes(key_dict['_MIN_VALUES'],
+                                                           
self.trimmed_primary_key_fields),
+                
max_value=BinaryRowDeserializer.from_bytes(key_dict['_MAX_VALUES'],
+                                                           
self.trimmed_primary_key_fields),
+                null_count=key_dict['_NULL_COUNTS'],
+            )
+            value_dict = dict(file_dict['_VALUE_STATS'])
+            value_stats = SimpleStats(
+                
min_value=BinaryRowDeserializer.from_bytes(value_dict['_MIN_VALUES'],
+                                                           
self.table.table_schema.fields),
+                
max_value=BinaryRowDeserializer.from_bytes(value_dict['_MAX_VALUES'],
+                                                           
self.table.table_schema.fields),
+                null_count=value_dict['_NULL_COUNTS'],
+            )
             file_meta = DataFileMeta(
-                file_name=file_info['_FILE_NAME'],
-                file_size=file_info['_FILE_SIZE'],
-                row_count=file_info['_ROW_COUNT'],
-                
min_key=BinaryRowDeserializer.from_bytes(file_info['_MIN_KEY'], 
self.trimmed_primary_key_fields),
-                
max_key=BinaryRowDeserializer.from_bytes(file_info['_MAX_KEY'], 
self.trimmed_primary_key_fields),
-                key_stats=None,  # TODO
-                value_stats=None,  # TODO
-                min_sequence_number=file_info['_MIN_SEQUENCE_NUMBER'],
-                max_sequence_number=file_info['_MAX_SEQUENCE_NUMBER'],
-                schema_id=file_info['_SCHEMA_ID'],
-                level=file_info['_LEVEL'],
-                extra_files=None,  # TODO
+                file_name=file_dict['_FILE_NAME'],
+                file_size=file_dict['_FILE_SIZE'],
+                row_count=file_dict['_ROW_COUNT'],
+                
min_key=BinaryRowDeserializer.from_bytes(file_dict['_MIN_KEY'], 
self.trimmed_primary_key_fields),
+                
max_key=BinaryRowDeserializer.from_bytes(file_dict['_MAX_KEY'], 
self.trimmed_primary_key_fields),
+                key_stats=key_stats,
+                value_stats=value_stats,
+                min_sequence_number=file_dict['_MIN_SEQUENCE_NUMBER'],
+                max_sequence_number=file_dict['_MAX_SEQUENCE_NUMBER'],
+                schema_id=file_dict['_SCHEMA_ID'],
+                level=file_dict['_LEVEL'],
+                extra_files=file_dict['_EXTRA_FILES'],
+                creation_time=file_dict['_CREATION_TIME'],
+                delete_row_count=file_dict['_DELETE_ROW_COUNT'],
+                embedded_index=file_dict['_EMBEDDED_FILE_INDEX'],
+                file_source=file_dict['_FILE_SOURCE'],
             )
             entry = ManifestEntry(
                 kind=record['_KIND'],
@@ -73,22 +94,23 @@ class ManifestFileManager:
                 total_buckets=record['_TOTAL_BUCKETS'],
                 file=file_meta
             )
-            if not shard_filter(entry):
+            if shard_filter is not None and not shard_filter(entry):
                 continue
             entries.append(entry)
         return entries
 
-    def write(self, commit_messages: List['CommitMessage']) -> List[str]:
+    def write(self, file_name, commit_messages: List[CommitMessage]):
         avro_records = []
         for message in commit_messages:
             partition_bytes = BinaryRowSerializer.to_bytes(
-                BinaryRow(list(message.partition()), 
self.table.table_schema.get_partition_key_fields()))
-            for file in message.new_files():
+                BinaryRow(list(message.partition), 
self.table.table_schema.get_partition_key_fields()))
+            for file in message.new_files:
                 avro_record = {
+                    "_VERSION": 2,
                     "_KIND": 0,
                     "_PARTITION": partition_bytes,
-                    "_BUCKET": message.bucket(),
-                    "_TOTAL_BUCKETS": -1,  # TODO
+                    "_BUCKET": message.bucket,
+                    "_TOTAL_BUCKETS": self.table.total_buckets,
                     "_FILE": {
                         "_FILE_NAME": file.file_name,
                         "_FILE_SIZE": file.file_size,
@@ -96,33 +118,35 @@ class ManifestFileManager:
                         "_MIN_KEY": BinaryRowSerializer.to_bytes(file.min_key),
                         "_MAX_KEY": BinaryRowSerializer.to_bytes(file.max_key),
                         "_KEY_STATS": {
-                            "_MIN_VALUES": None,
-                            "_MAX_VALUES": None,
-                            "_NULL_COUNTS": 0,
+                            "_MIN_VALUES": 
BinaryRowSerializer.to_bytes(file.key_stats.min_value),
+                            "_MAX_VALUES": 
BinaryRowSerializer.to_bytes(file.key_stats.max_value),
+                            "_NULL_COUNTS": file.key_stats.null_count,
                         },
                         "_VALUE_STATS": {
-                            "_MIN_VALUES": None,
-                            "_MAX_VALUES": None,
-                            "_NULL_COUNTS": 0,
+                            "_MIN_VALUES": 
BinaryRowSerializer.to_bytes(file.value_stats.min_value),
+                            "_MAX_VALUES": 
BinaryRowSerializer.to_bytes(file.value_stats.max_value),
+                            "_NULL_COUNTS": file.value_stats.null_count,
                         },
-                        "_MIN_SEQUENCE_NUMBER": 0,
-                        "_MAX_SEQUENCE_NUMBER": 0,
-                        "_SCHEMA_ID": 0,
-                        "_LEVEL": 0,
-                        "_EXTRA_FILES": [],
+                        "_MIN_SEQUENCE_NUMBER": file.min_sequence_number,
+                        "_MAX_SEQUENCE_NUMBER": file.max_sequence_number,
+                        "_SCHEMA_ID": file.schema_id,
+                        "_LEVEL": file.level,
+                        "_EXTRA_FILES": file.extra_files,
+                        "_CREATION_TIME": file.creation_time,
+                        "_DELETE_ROW_COUNT": file.delete_row_count,
+                        "_EMBEDDED_FILE_INDEX": file.embedded_index,
+                        "_FILE_SOURCE": file.file_source,
                     }
                 }
                 avro_records.append(avro_record)
 
-        manifest_filename = f"manifest-{str(uuid.uuid4())}.avro"
-        manifest_path = self.manifest_path / manifest_filename
+        manifest_path = self.manifest_path / file_name
         try:
             buffer = BytesIO()
             fastavro.writer(buffer, MANIFEST_ENTRY_SCHEMA, avro_records)
             avro_bytes = buffer.getvalue()
             with self.file_io.new_output_stream(manifest_path) as 
output_stream:
                 output_stream.write(avro_bytes)
-            return [str(manifest_filename)]
         except Exception as e:
             self.file_io.delete_quietly(manifest_path)
             raise RuntimeError(f"Failed to write manifest file: {e}") from e
diff --git a/paimon-python/pypaimon/manifest/manifest_list_manager.py 
b/paimon-python/pypaimon/manifest/manifest_list_manager.py
index a338441465..65fd2b21ac 100644
--- a/paimon-python/pypaimon/manifest/manifest_list_manager.py
+++ b/paimon-python/pypaimon/manifest/manifest_list_manager.py
@@ -16,15 +16,17 @@
 # limitations under the License.
 
################################################################################
 
-import uuid
 from io import BytesIO
-from typing import List, Optional
+from typing import List
 
 import fastavro
 
-from pypaimon.manifest.schema.manifest_file_meta import \
-    MANIFEST_FILE_META_SCHEMA
+from pypaimon.manifest.schema.manifest_file_meta import (
+    MANIFEST_FILE_META_SCHEMA, ManifestFileMeta)
+from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.snapshot.snapshot import Snapshot
+from pypaimon.table.row.binary_row import (BinaryRowDeserializer,
+                                           BinaryRowSerializer)
 
 
 class ManifestListManager:
@@ -37,57 +39,72 @@ class ManifestListManager:
         self.manifest_path = self.table.table_path / "manifest"
         self.file_io = self.table.file_io
 
-    def read_all_manifest_files(self, snapshot: Snapshot) -> List[str]:
+    def read_all(self, snapshot: Snapshot) -> List[ManifestFileMeta]:
         manifest_files = []
         base_manifests = self.read(snapshot.base_manifest_list)
         manifest_files.extend(base_manifests)
         delta_manifests = self.read(snapshot.delta_manifest_list)
         manifest_files.extend(delta_manifests)
-        return list(set(manifest_files))
+        return manifest_files
 
-    def read(self, manifest_list_name: str) -> List[str]:
-        manifest_list_path = self.manifest_path / manifest_list_name
-        manifest_paths = []
+    def read(self, manifest_list_name: str) -> List[ManifestFileMeta]:
+        manifest_files = []
 
+        manifest_list_path = self.manifest_path / manifest_list_name
         with self.file_io.new_input_stream(manifest_list_path) as input_stream:
             avro_bytes = input_stream.read()
         buffer = BytesIO(avro_bytes)
         reader = fastavro.reader(buffer)
         for record in reader:
-            file_name = record['_FILE_NAME']
-            manifest_paths.append(file_name)
-
-        return manifest_paths
+            stats_dict = dict(record['_PARTITION_STATS'])
+            partition_stats = SimpleStats(
+                min_value=BinaryRowDeserializer.from_bytes(
+                    stats_dict['_MIN_VALUES'],
+                    self.table.table_schema.get_partition_key_fields()
+                ),
+                max_value=BinaryRowDeserializer.from_bytes(
+                    stats_dict['_MAX_VALUES'],
+                    self.table.table_schema.get_partition_key_fields()
+                ),
+                null_count=stats_dict['_NULL_COUNTS'],
+            )
+            manifest_file_meta = ManifestFileMeta(
+                file_name=record['_FILE_NAME'],
+                file_size=record['_FILE_SIZE'],
+                num_added_files=record['_NUM_ADDED_FILES'],
+                num_deleted_files=record['_NUM_DELETED_FILES'],
+                partition_stats=partition_stats,
+                schema_id=record['_SCHEMA_ID'],
+            )
+            manifest_files.append(manifest_file_meta)
 
-    def write(self, manifest_file_names: List[str]) -> Optional[str]:
-        if not manifest_file_names:
-            return None
+        return manifest_files
 
+    def write(self, file_name, manifest_file_metas: List[ManifestFileMeta]):
         avro_records = []
-        for manifest_file_name in manifest_file_names:
+        for meta in manifest_file_metas:
             avro_record = {
-                "_FILE_NAME": manifest_file_name,
-                "_FILE_SIZE": 0,  # TODO
-                "_NUM_ADDED_FILES": 0,
-                "_NUM_DELETED_FILES": 0,
+                "_VERSION": 2,
+                "_FILE_NAME": meta.file_name,
+                "_FILE_SIZE": meta.file_size,
+                "_NUM_ADDED_FILES": meta.num_added_files,
+                "_NUM_DELETED_FILES": meta.num_deleted_files,
                 "_PARTITION_STATS": {
-                    "_MIN_VALUES": None,
-                    "_MAX_VALUES": None,
-                    "_NULL_COUNTS": 0,
+                    "_MIN_VALUES": 
BinaryRowSerializer.to_bytes(meta.partition_stats.min_value),
+                    "_MAX_VALUES": 
BinaryRowSerializer.to_bytes(meta.partition_stats.max_value),
+                    "_NULL_COUNTS": meta.partition_stats.null_count,
                 },
-                "_SCHEMA_ID": 0,
+                "_SCHEMA_ID": meta.schema_id,
             }
             avro_records.append(avro_record)
 
-        list_filename = f"manifest-list-{str(uuid.uuid4())}.avro"
-        list_path = self.manifest_path / list_filename
+        list_path = self.manifest_path / file_name
         try:
             buffer = BytesIO()
             fastavro.writer(buffer, MANIFEST_FILE_META_SCHEMA, avro_records)
             avro_bytes = buffer.getvalue()
             with self.file_io.new_output_stream(list_path) as output_stream:
                 output_stream.write(avro_bytes)
-            return list_filename
         except Exception as e:
             self.file_io.delete_quietly(list_path)
             raise RuntimeError(f"Failed to write manifest list file: {e}") 
from e
diff --git a/paimon-python/pypaimon/manifest/schema/data_file_meta.py 
b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
index 02a7179218..e1f60bf2e1 100644
--- a/paimon-python/pypaimon/manifest/schema/data_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/data_file_meta.py
@@ -19,7 +19,7 @@
 from dataclasses import dataclass
 from datetime import datetime
 from pathlib import Path
-from typing import List, Optional
+from typing import List
 
 from pypaimon.manifest.schema.simple_stats import (SIMPLE_STATS_SCHEMA,
                                                    SimpleStats)
@@ -31,34 +31,32 @@ class DataFileMeta:
     file_name: str
     file_size: int
     row_count: int
-    min_key: Optional[BinaryRow]
-    max_key: Optional[BinaryRow]
-    key_stats: Optional[SimpleStats]
-    value_stats: Optional[SimpleStats]
+    min_key: BinaryRow
+    max_key: BinaryRow
+    key_stats: SimpleStats
+    value_stats: SimpleStats
     min_sequence_number: int
     max_sequence_number: int
     schema_id: int
     level: int
-    extra_files: Optional[List[str]]
+    extra_files: List[str]
 
-    creation_time: Optional[datetime] = None
-    delete_row_count: Optional[int] = None
-    embedded_index: Optional[bytes] = None
-    file_source: Optional[str] = None
-    value_stats_cols: Optional[List[str]] = None
-    external_path: Optional[str] = None
+    creation_time: datetime | None = None
+    delete_row_count: int | None = None
+    embedded_index: bytes | None = None
+    file_source: str | None = None
+    value_stats_cols: List[str] | None = None
+    external_path: str | None = None
 
+    # not a schema field, just for internal usage
     file_path: str = None
 
     def set_file_path(self, table_path: Path, partition: BinaryRow, bucket: 
int):
         path_builder = table_path
-
         partition_dict = partition.to_dict()
         for field_name, field_value in partition_dict.items():
             path_builder = path_builder / (field_name + "=" + str(field_value))
-
         path_builder = path_builder / ("bucket-" + str(bucket)) / 
self.file_name
-
         self.file_path = str(path_builder)
 
 
@@ -78,11 +76,13 @@ DATA_FILE_META_SCHEMA = {
         {"name": "_SCHEMA_ID", "type": "long"},
         {"name": "_LEVEL", "type": "int"},
         {"name": "_EXTRA_FILES", "type": {"type": "array", "items": "string"}},
-        {"name": "_CREATION_TIME", "type": ["null", "long"], "default": None},
+        {"name": "_CREATION_TIME",
+         "type": [
+             "null",
+             {"type": "long", "logicalType": "timestamp-millis"}],
+         "default": None},
         {"name": "_DELETE_ROW_COUNT", "type": ["null", "long"], "default": 
None},
         {"name": "_EMBEDDED_FILE_INDEX", "type": ["null", "bytes"], "default": 
None},
         {"name": "_FILE_SOURCE", "type": ["null", "int"], "default": None},
-        {"name": "_VALUE_STATS_COLS", "type": ["null", {"type": "array", 
"items": "string"}], "default": None},
-        {"name": "_EXTERNAL_PATH", "type": ["null", "string"], "default": 
None},
     ]
 }
diff --git a/paimon-python/pypaimon/manifest/schema/manifest_entry.py 
b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
index 1a80553188..75a51f30c5 100644
--- a/paimon-python/pypaimon/manifest/schema/manifest_entry.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_entry.py
@@ -36,6 +36,7 @@ MANIFEST_ENTRY_SCHEMA = {
     "type": "record",
     "name": "ManifestEntry",
     "fields": [
+        {"name": "_VERSION", "type": "int"},
         {"name": "_KIND", "type": "int"},
         {"name": "_PARTITION", "type": "bytes"},
         {"name": "_BUCKET", "type": "int"},
diff --git a/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py 
b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
index a001fca4c2..443c6a0944 100644
--- a/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
+++ b/paimon-python/pypaimon/manifest/schema/manifest_file_meta.py
@@ -30,16 +30,13 @@ class ManifestFileMeta:
     num_deleted_files: int
     partition_stats: SimpleStats
     schema_id: int
-    min_bucket: int
-    max_bucket: int
-    min_level: int
-    max_level: int
 
 
 MANIFEST_FILE_META_SCHEMA = {
     "type": "record",
     "name": "ManifestFileMeta",
     "fields": [
+        {"name": "_VERSION", "type": "int"},
         {"name": "_FILE_NAME", "type": "string"},
         {"name": "_FILE_SIZE", "type": "long"},
         {"name": "_NUM_ADDED_FILES", "type": "long"},
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py 
b/paimon-python/pypaimon/manifest/schema/simple_stats.py
index b291c12852..dd6924fb2e 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/manifest/schema/simple_stats.py
@@ -17,6 +17,7 @@
 
################################################################################
 
 from dataclasses import dataclass
+from typing import List
 
 from pypaimon.table.row.binary_row import BinaryRow
 
@@ -25,15 +26,23 @@ from pypaimon.table.row.binary_row import BinaryRow
 class SimpleStats:
     min_value: BinaryRow
     max_value: BinaryRow
-    null_count: int
+    null_count: List[int] | None
 
 
 SIMPLE_STATS_SCHEMA = {
     "type": "record",
     "name": "SimpleStats",
     "fields": [
-        {"name": "_MIN_VALUES", "type": ["null", "bytes"], "default": None},
-        {"name": "_MAX_VALUES", "type": ["null", "bytes"], "default": None},
-        {"name": "_NULL_COUNTS", "type": ["null", "long"], "default": None},
+        {"name": "_MIN_VALUES", "type": "bytes"},
+        {"name": "_MAX_VALUES", "type": "bytes"},
+        {"name": "_NULL_COUNTS",
+         "type": [
+             "null",
+             {
+                 "type": "array",
+                 "items": ["null", "long"]
+             }
+         ],
+         "default": None},
     ]
 }
diff --git a/paimon-python/pypaimon/read/reader/data_file_record_reader.py 
b/paimon-python/pypaimon/read/reader/data_file_record_reader.py
index 41c1bf7517..c83f1ce152 100644
--- a/paimon-python/pypaimon/read/reader/data_file_record_reader.py
+++ b/paimon-python/pypaimon/read/reader/data_file_record_reader.py
@@ -23,6 +23,7 @@ from pyarrow import RecordBatch
 
 from pypaimon.read.partition_info import PartitionInfo
 from pypaimon.read.reader.iface.record_batch_reader import RecordBatchReader
+from pypaimon.schema.data_types import DataField, PyarrowFieldParser
 
 
 class DataFileBatchReader(RecordBatchReader):
@@ -31,11 +32,12 @@ class DataFileBatchReader(RecordBatchReader):
     """
 
     def __init__(self, format_reader: RecordBatchReader, index_mapping: 
List[int], partition_info: PartitionInfo,
-                 system_primary_key: Optional[List[str]]):
+                 system_primary_key: Optional[List[str]], fields: 
List[DataField]):
         self.format_reader = format_reader
         self.index_mapping = index_mapping
         self.partition_info = partition_info
         self.system_primary_key = system_primary_key
+        self.schema_map = {field.name: field for field in 
PyarrowFieldParser.from_paimon_schema(fields)}
 
     def read_arrow_batch(self) -> Optional[RecordBatch]:
         record_batch = self.format_reader.read_arrow_batch()
@@ -85,7 +87,17 @@ class DataFileBatchReader(RecordBatchReader):
             inter_arrays = mapped_arrays
             inter_names = mapped_names
 
-        return pa.RecordBatch.from_arrays(inter_arrays, names=inter_names)
+        # to contains 'not null' property
+        final_fields = []
+        for i, name in enumerate(inter_names):
+            array = inter_arrays[i]
+            target_field = self.schema_map.get(name)
+            if not target_field:
+                target_field = pa.field(name, array.type)
+            final_fields.append(target_field)
+        final_schema = pa.schema(final_fields)
+
+        return pa.RecordBatch.from_arrays(inter_arrays, schema=final_schema)
 
     def close(self) -> None:
         self.format_reader.close()
diff --git a/paimon-python/pypaimon/read/split_read.py 
b/paimon-python/pypaimon/read/split_read.py
index ea37593fdc..f085bac444 100644
--- a/paimon-python/pypaimon/read/split_read.py
+++ b/paimon-python/pypaimon/read/split_read.py
@@ -83,9 +83,11 @@ class SplitRead(ABC):
         index_mapping = self.create_index_mapping()
         partition_info = self.create_partition_info()
         if for_merge_read:
-            return DataFileBatchReader(format_reader, index_mapping, 
partition_info, self.trimmed_primary_key)
+            return DataFileBatchReader(format_reader, index_mapping, 
partition_info, self.trimmed_primary_key,
+                                       self.table.table_schema.fields)
         else:
-            return DataFileBatchReader(format_reader, index_mapping, 
partition_info, None)
+            return DataFileBatchReader(format_reader, index_mapping, 
partition_info, None,
+                                       self.table.table_schema.fields)
 
     @abstractmethod
     def _get_all_data_fields(self):
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index 2745b1d1b3..d89ddd0bcb 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -63,11 +63,11 @@ class TableScan:
         latest_snapshot = self.snapshot_manager.get_latest_snapshot()
         if not latest_snapshot:
             return Plan([])
-        manifest_files = 
self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
+        manifest_files = self.manifest_list_manager.read_all(latest_snapshot)
 
         file_entries = []
-        for manifest_file_path in manifest_files:
-            manifest_entries = 
self.manifest_file_manager.read(manifest_file_path,
+        for manifest_file in manifest_files:
+            manifest_entries = 
self.manifest_file_manager.read(manifest_file.file_name,
                                                                lambda row: 
self._bucket_filter(row))
             for entry in manifest_entries:
                 if entry.kind == 0:
diff --git a/paimon-python/pypaimon/schema/data_types.py 
b/paimon-python/pypaimon/schema/data_types.py
index 8a22e7f012..a5186cc56c 100644
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -279,13 +279,16 @@ class DataTypeParser:
 
         if "(" in type_upper:
             base_type = type_upper.split("(")[0]
+        elif " " in type_upper:
+            base_type = type_upper.split(" ")[0]
+            type_upper = base_type
         else:
             base_type = type_upper
 
         try:
             Keyword(base_type)
             return AtomicType(
-                type_string, DataTypeParser.parse_nullability(type_string)
+                type_upper, DataTypeParser.parse_nullability(type_string)
             )
         except ValueError:
             raise Exception(f"Unknown type: {base_type}")
@@ -345,11 +348,7 @@ class DataTypeParser:
     def parse_data_field(
             json_data: Dict[str, Any], field_id: Optional[AtomicInteger] = None
     ) -> DataField:
-
-        if (
-                DataField.FIELD_ID in json_data
-                and json_data[DataField.FIELD_ID] is not None
-        ):
+        if DataField.FIELD_ID in json_data and json_data[DataField.FIELD_ID] 
is not None:
             if field_id is not None and field_id.get() != -1:
                 raise ValueError("Partial field id is not allowed.")
             field_id_value = int(json_data["id"])
@@ -486,7 +485,7 @@ class PyarrowFieldParser:
             return MapType(nullable, key_type, value_type)
         else:
             raise ValueError(f"Unknown type: {type_name}")
-        return AtomicType(type_name)
+        return AtomicType(type_name, nullable)
 
     @staticmethod
     def to_paimon_field(field_idx: int, pa_field: pyarrow.Field) -> DataField:
diff --git a/paimon-python/pypaimon/schema/schema_manager.py 
b/paimon-python/pypaimon/schema/schema_manager.py
index f0a108befb..f03b9e111b 100644
--- a/paimon-python/pypaimon/schema/schema_manager.py
+++ b/paimon-python/pypaimon/schema/schema_manager.py
@@ -19,6 +19,7 @@ from pathlib import Path
 from typing import Optional
 
 from pypaimon.common.file_io import FileIO
+from pypaimon.common.rest_json import JSON
 from pypaimon.schema.schema import Schema
 from pypaimon.schema.table_schema import TableSchema
 
@@ -42,15 +43,11 @@ class SchemaManager:
         except Exception as e:
             raise RuntimeError(f"Failed to load schema from path: 
{self.schema_path}") from e
 
-    def create_table(self, schema: Schema, external_table: bool = False) -> 
TableSchema:
+    def create_table(self, schema: Schema) -> TableSchema:
         while True:
             latest = self.latest()
             if latest is not None:
-                if external_table:
-                    self._check_schema_for_external_table(latest.to_schema(), 
schema)
-                    return latest
-                else:
-                    raise RuntimeError("Schema in filesystem exists, creation 
is not allowed.")
+                raise RuntimeError("Schema in filesystem exists, creation is 
not allowed.")
 
             table_schema = TableSchema.from_schema(schema_id=0, schema=schema)
             success = self.commit(table_schema)
@@ -60,7 +57,7 @@ class SchemaManager:
     def commit(self, new_schema: TableSchema) -> bool:
         schema_path = self._to_schema_path(new_schema.id)
         try:
-            return self.file_io.try_to_write_atomic(schema_path, 
new_schema.to_json())
+            return self.file_io.try_to_write_atomic(schema_path, 
JSON.to_json(new_schema, indent=2))
         except Exception as e:
             raise RuntimeError(f"Failed to commit schema: {e}") from e
 
diff --git a/paimon-python/pypaimon/schema/table_schema.py 
b/paimon-python/pypaimon/schema/table_schema.py
index 80b787eb6e..dc1872deb6 100644
--- a/paimon-python/pypaimon/schema/table_schema.py
+++ b/paimon-python/pypaimon/schema/table_schema.py
@@ -57,22 +57,6 @@ class TableSchema:
     comment: Optional[str] = json_field(FIELD_COMMENT, default=None)
     time_millis: int = json_field("timeMillis", default_factory=lambda: 
int(time.time() * 1000))
 
-    def __init__(self, version: int, id: int, fields: List[DataField], 
highest_field_id: int,
-                 partition_keys: List[str], primary_keys: List[str], options: 
Dict[str, str],
-                 comment: Optional[str] = None, time_millis: Optional[int] = 
None):
-        self.version = version
-        self.id = id
-        self.fields = fields
-        self.highest_field_id = highest_field_id
-        self.partition_keys = partition_keys or []
-        self.primary_keys = primary_keys or []
-        self.options = options or {}
-        self.comment = comment
-        self.time_millis = time_millis if time_millis is not None else 
int(time.time() * 1000)
-        self.get_trimmed_primary_key_fields()
-
-    from typing import List
-
     def cross_partition_update(self) -> bool:
         if not self.primary_keys or not self.partition_keys:
             return False
@@ -96,7 +80,7 @@ class TableSchema:
         partition_keys: List[str] = schema.partition_keys
         primary_keys: List[str] = schema.primary_keys
         options: Dict[str, str] = schema.options
-        highest_field_id: int = -1  # max(field.id for field in fields)
+        highest_field_id: int = max(field.id for field in fields)
 
         return TableSchema(
             TableSchema.CURRENT_VERSION,
@@ -106,8 +90,7 @@ class TableSchema:
             partition_keys,
             primary_keys,
             options,
-            schema.comment,
-            int(time.time())
+            schema.comment
         )
 
     @staticmethod
@@ -150,22 +133,6 @@ class TableSchema:
         except Exception as e:
             raise RuntimeError(f"Failed to parse schema from JSON: {e}") from e
 
-    def to_json(self) -> str:
-        data = {
-            TableSchema.FIELD_VERSION: self.version,
-            TableSchema.FIELD_ID: self.id,
-            TableSchema.FIELD_FIELDS: [field.to_dict() for field in 
self.fields],
-            TableSchema.FIELD_HIGHEST_FIELD_ID: self.highest_field_id,
-            TableSchema.FIELD_PARTITION_KEYS: self.partition_keys,
-            TableSchema.FIELD_PRIMARY_KEYS: self.primary_keys,
-            TableSchema.FIELD_OPTIONS: self.options,
-            TableSchema.FIELD_COMMENT: self.comment,
-            TableSchema.FIELD_TIME_MILLIS: self.time_millis
-        }
-        if self.comment is not None:
-            data["comment"] = self.comment
-        return json.dumps(data, indent=2, ensure_ascii=False)
-
     def copy(self, new_options: Optional[Dict[str, str]] = None) -> 
"TableSchema":
         return TableSchema(
             version=self.version,
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 1d131a492d..eb763d2938 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -49,10 +49,11 @@ class FileStoreTable(Table):
         self.primary_keys = table_schema.primary_keys
         self.partition_keys = table_schema.partition_keys
         self.options = table_schema.options
+        self.cross_partition_update = 
self.table_schema.cross_partition_update()
+        self.is_primary_key_table = bool(self.primary_keys)
+        self.total_buckets = int(table_schema.options.get(CoreOptions.BUCKET, 
-1))
 
         self.schema_manager = SchemaManager(file_io, table_path)
-        self.is_primary_key_table = bool(self.primary_keys)
-        self.cross_partition_update = 
self.table_schema.cross_partition_update()
 
     def current_branch(self) -> str:
         """Get the current branch name from options."""
diff --git a/paimon-python/pypaimon/table/row/binary_row.py 
b/paimon-python/pypaimon/table/row/binary_row.py
index 2e44ff417e..f1f8e740df 100644
--- a/paimon-python/pypaimon/table/row/binary_row.py
+++ b/paimon-python/pypaimon/table/row/binary_row.py
@@ -193,7 +193,7 @@ class BinaryRowDeserializer:
             return bytes_data[base_offset + sub_offset:base_offset + 
sub_offset + length]
         else:
             length = (offset_and_len & cls.HIGHEST_SECOND_TO_EIGHTH_BIT) >> 56
-            return bytes_data[field_offset + 1:field_offset + 1 + length]
+            return bytes_data[field_offset:field_offset + length]
 
     @classmethod
     def _parse_decimal(cls, bytes_data: bytes, base_offset: int, field_offset: 
int, data_type: DataType) -> Decimal:
@@ -238,8 +238,6 @@ class BinaryRowDeserializer:
 class BinaryRowSerializer:
     HEADER_SIZE_IN_BITS = 8
     MAX_FIX_PART_DATA_SIZE = 7
-    HIGHEST_FIRST_BIT = 0x80 << 56
-    HIGHEST_SECOND_TO_EIGHTH_BIT = 0x7F << 56
 
     @classmethod
     def to_bytes(cls, binary_row: BinaryRow) -> bytes:
@@ -252,79 +250,49 @@ class BinaryRowSerializer:
         fixed_part = bytearray(fixed_part_size)
         fixed_part[0] = binary_row.row_kind.value
 
-        for i, value in enumerate(binary_row.values):
-            if value is None:
-                cls._set_null_bit(fixed_part, 0, i)
-
-        variable_data = []
-        variable_offsets = []
-        current_offset = fixed_part_size
+        variable_part_data = []
+        current_variable_offset = 0
 
         for i, (value, field) in enumerate(zip(binary_row.values, 
binary_row.fields)):
+            field_fixed_offset = null_bits_size_in_bytes + i * 8
+
             if value is None:
-                struct.pack_into('<q', fixed_part, null_bits_size_in_bytes + i 
* 8, 0)
-                variable_data.append(b'')
-                variable_offsets.append(0)
+                cls._set_null_bit(fixed_part, 0, i)
+                struct.pack_into('<q', fixed_part, field_fixed_offset, 0)
                 continue
 
-            field_offset = null_bits_size_in_bytes + i * 8
             if not isinstance(field.type, AtomicType):
                 raise ValueError(f"BinaryRow only support AtomicType yet, meet 
{field.type.__class__}")
-            if field.type.type.upper() in ['VARCHAR', 'STRING', 'CHAR', 
'BINARY', 'VARBINARY', 'BYTES']:
-                if field.type.type.upper() in ['VARCHAR', 'STRING', 'CHAR']:
-                    if isinstance(value, str):
-                        value_bytes = value.encode('utf-8')
-                    else:
-                        value_bytes = bytes(value)
+
+            type_name = field.type.type.upper()
+            if type_name in ['VARCHAR', 'STRING', 'CHAR', 'BINARY', 
'VARBINARY', 'BYTES']:
+                if type_name in ['VARCHAR', 'STRING', 'CHAR']:
+                    value_bytes = str(value).encode('utf-8')
                 else:
-                    if isinstance(value, bytes):
-                        value_bytes = value
-                    else:
-                        value_bytes = bytes(value)
+                    value_bytes = bytes(value)
 
                 length = len(value_bytes)
                 if length <= cls.MAX_FIX_PART_DATA_SIZE:
-                    fixed_part[field_offset:field_offset + length] = 
value_bytes
-                    for j in range(length, 8):
-                        fixed_part[field_offset + j] = 0
-                    packed_long = struct.unpack_from('<q', fixed_part, 
field_offset)[0]
-
-                    offset_and_len = packed_long | (length << 56) | 
cls.HIGHEST_FIRST_BIT
-                    if offset_and_len > 0x7FFFFFFFFFFFFFFF:
-                        offset_and_len = offset_and_len - 0x10000000000000000
-                    struct.pack_into('<q', fixed_part, field_offset, 
offset_and_len)
-                    variable_data.append(b'')
-                    variable_offsets.append(0)
+                    fixed_part[field_fixed_offset: field_fixed_offset + 
length] = value_bytes
+                    for j in range(length, 7):
+                        fixed_part[field_fixed_offset + j] = 0
+                    header_byte = 0x80 | length
+                    fixed_part[field_fixed_offset + 7] = header_byte
                 else:
-                    variable_data.append(value_bytes)
-                    variable_offsets.append(current_offset)
-                    current_offset += len(value_bytes)
-                    offset_and_len = (variable_offsets[i] << 32) | 
len(variable_data[i])
-                    struct.pack_into('<q', fixed_part, null_bits_size_in_bytes 
+ i * 8, offset_and_len)
-            else:
-                if field.type.type.upper() in ['BOOLEAN', 'BOOL']:
-                    struct.pack_into('<b', fixed_part, field_offset, 1 if 
value else 0)
-                elif field.type.type.upper() in ['TINYINT', 'BYTE']:
-                    struct.pack_into('<b', fixed_part, field_offset, value)
-                elif field.type.type.upper() in ['SMALLINT', 'SHORT']:
-                    struct.pack_into('<h', fixed_part, field_offset, value)
-                elif field.type.type.upper() in ['INT', 'INTEGER']:
-                    struct.pack_into('<i', fixed_part, field_offset, value)
-                elif field.type.type.upper() in ['BIGINT', 'LONG']:
-                    struct.pack_into('<q', fixed_part, field_offset, value)
-                elif field.type.type.upper() in ['FLOAT', 'REAL']:
-                    struct.pack_into('<f', fixed_part, field_offset, value)
-                elif field.type.type.upper() in ['DOUBLE']:
-                    struct.pack_into('<d', fixed_part, field_offset, value)
-                else:
-                    field_bytes = cls._serialize_field_value(value, field.type)
-                    fixed_part[field_offset:field_offset + len(field_bytes)] = 
field_bytes
+                    offset_in_variable_part = current_variable_offset
+                    variable_part_data.append(value_bytes)
+                    current_variable_offset += length
 
-                variable_data.append(b'')
-                variable_offsets.append(0)
+                    absolute_offset = fixed_part_size + offset_in_variable_part
+                    offset_and_len = (absolute_offset << 32) | length
+                    struct.pack_into('<q', fixed_part, field_fixed_offset, 
offset_and_len)
+            else:
+                field_bytes = cls._serialize_field_value(value, field.type)
+                fixed_part[field_fixed_offset: field_fixed_offset + 
len(field_bytes)] = field_bytes
 
-        result = bytes(fixed_part) + b''.join(variable_data)
-        return result
+        row_data = bytes(fixed_part) + b''.join(variable_part_data)
+        arity_prefix = struct.pack('>i', arity)
+        return arity_prefix + row_data
 
     @classmethod
     def _calculate_bit_set_width_in_bytes(cls, arity: int) -> int:
@@ -342,33 +310,29 @@ class BinaryRowSerializer:
         type_name = data_type.type.upper()
 
         if type_name in ['BOOLEAN', 'BOOL']:
-            return cls._serialize_boolean(value)
+            return cls._serialize_boolean(value) + b'\x00' * 7
         elif type_name in ['TINYINT', 'BYTE']:
-            return cls._serialize_byte(value)
+            return cls._serialize_byte(value) + b'\x00' * 7
         elif type_name in ['SMALLINT', 'SHORT']:
-            return cls._serialize_short(value)
+            return cls._serialize_short(value) + b'\x00' * 6
         elif type_name in ['INT', 'INTEGER']:
-            return cls._serialize_int(value)
+            return cls._serialize_int(value) + b'\x00' * 4
         elif type_name in ['BIGINT', 'LONG']:
             return cls._serialize_long(value)
         elif type_name in ['FLOAT', 'REAL']:
-            return cls._serialize_float(value)
+            return cls._serialize_float(value) + b'\x00' * 4
         elif type_name in ['DOUBLE']:
             return cls._serialize_double(value)
-        elif type_name in ['VARCHAR', 'STRING', 'CHAR']:
-            return cls._serialize_string(value)
-        elif type_name in ['BINARY', 'VARBINARY', 'BYTES']:
-            return cls._serialize_binary(value)
         elif type_name in ['DECIMAL', 'NUMERIC']:
             return cls._serialize_decimal(value, data_type)
         elif type_name in ['TIMESTAMP', 'TIMESTAMP_WITHOUT_TIME_ZONE']:
             return cls._serialize_timestamp(value)
         elif type_name in ['DATE']:
-            return cls._serialize_date(value)
+            return cls._serialize_date(value) + b'\x00' * 4
         elif type_name in ['TIME', 'TIME_WITHOUT_TIME_ZONE']:
-            return cls._serialize_time(value)
+            return cls._serialize_time(value) + b'\x00' * 4
         else:
-            return cls._serialize_string(str(value))
+            raise TypeError(f"Unsupported type for serialization: {type_name}")
 
     @classmethod
     def _serialize_boolean(cls, value: bool) -> bytes:
@@ -398,32 +362,6 @@ class BinaryRowSerializer:
     def _serialize_double(cls, value: float) -> bytes:
         return struct.pack('<d', value)
 
-    @classmethod
-    def _serialize_string(cls, value) -> bytes:
-        if isinstance(value, str):
-            value_bytes = value.encode('utf-8')
-        else:
-            value_bytes = bytes(value)
-
-        length = len(value_bytes)
-
-        offset_and_len = (0x80 << 56) | (length << 56)
-        if offset_and_len > 0x7FFFFFFFFFFFFFFF:
-            offset_and_len = offset_and_len - 0x10000000000000000
-        return struct.pack('<q', offset_and_len)
-
-    @classmethod
-    def _serialize_binary(cls, value: bytes) -> bytes:
-        if isinstance(value, bytes):
-            data_bytes = value
-        else:
-            data_bytes = bytes(value)
-        length = len(data_bytes)
-        offset_and_len = (0x80 << 56) | (length << 56)
-        if offset_and_len > 0x7FFFFFFFFFFFFFFF:
-            offset_and_len = offset_and_len - 0x10000000000000000
-        return struct.pack('<q', offset_and_len)
-
     @classmethod
     def _serialize_decimal(cls, value: Decimal, data_type: DataType) -> bytes:
         type_str = str(data_type)
@@ -452,11 +390,10 @@ class BinaryRowSerializer:
     @classmethod
     def _serialize_date(cls, value: datetime) -> bytes:
         if isinstance(value, datetime):
-            epoch = datetime(1970, 1, 1)
-            days = (value - epoch).days
+            epoch = datetime(1970, 1, 1).date()
+            days = (value.date() - epoch).days
         else:
-            epoch = datetime(1970, 1, 1)
-            days = (value - epoch).days
+            raise RuntimeError("date should be datatime")
         return struct.pack('<i', days)
 
     @classmethod
diff --git a/paimon-python/pypaimon/tests/java_python_hybrid_test.py 
b/paimon-python/pypaimon/tests/java_python_hybrid_test.py
new file mode 100644
index 0000000000..0b36236ba5
--- /dev/null
+++ b/paimon-python/pypaimon/tests/java_python_hybrid_test.py
@@ -0,0 +1,132 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import glob
+import os
+import shutil
+import tempfile
+import unittest
+
+import pyarrow as pa
+
+from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.schema.schema import Schema
+from pypaimon.tests.py4j_impl import constants
+from pypaimon.tests.py4j_impl.java_implementation import CatalogPy4j
+
+
+class AlternativeWriteTest(unittest.TestCase):
+    @classmethod
+    def setUpClass(cls):
+        # for py4j env
+        this_dir = os.path.abspath(os.path.dirname(__file__))
+        project_dir = os.path.dirname(this_dir)
+        deps_dir = os.path.join(project_dir, "tests/py4j_impl/test_deps/*")
+        print(f"py4j deps_dir: {deps_dir}")
+        for file in glob.glob(deps_dir):
+            print(f"py4j deps_dir file: {file}")
+        os.environ[constants.PYPAIMON_HADOOP_CLASSPATH] = deps_dir
+        os.environ[constants.PYPAIMON4J_TEST_MODE] = 'true'
+
+        # for default catalog
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.option = {
+            'warehouse': cls.warehouse
+        }
+        cls.py_catalog = CatalogFactory.create(cls.option)
+        cls.j_catalog = CatalogPy4j.create(cls.option)
+
+        cls.pa_schema = pa.schema([
+            pa.field('user_id', pa.int32(), nullable=False),
+            ('item_id', pa.int64()),
+            ('behavior', pa.string()),
+            pa.field('dt', pa.string(), nullable=False)
+        ])
+        cls.expected_data = {
+            'user_id': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12],
+            'item_id': [1001, 2001, 3001, 4007, 5007, 6007, 7007, 8007, 9007, 
1006, 1106, 1206],
+            'behavior': ['l', 'k', 'j', 'i-new', None, 'g-new', 'f-new', 
'e-new', 'd-new', 'c-new', 'b-new', 'a-new'],
+            'dt': ['p1', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 'p2', 'p1', 
'p2', 'p1'],
+        }
+        cls.expected_result = pa.Table.from_pydict(cls.expected_data, 
schema=cls.pa_schema)
+
+    @classmethod
+    def tearDownClass(cls):
+        shutil.rmtree(cls.tempdir, ignore_errors=True)
+
+    def testAlternativeWrite(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema,
+                                            partition_keys=['dt'],
+                                            primary_keys=['user_id', 'dt'],
+                                            options={'bucket': '1'}
+                                            )
+        self.py_catalog.create_database('default', False)
+        self.py_catalog.create_table('default.test_alternative_write', schema, 
False)
+
+        self.py_table = 
self.py_catalog.get_table('default.test_alternative_write')
+        self.j_table = 
self.j_catalog.get_table('default.test_alternative_write')
+
+        self._write_data({
+            'user_id': self.expected_data.get('user_id')[0:6],
+            'item_id': [1001, 2001, 3001, 4001, 5001, 6001],
+            'behavior': ['l', 'k', 'j', 'i', None, 'g'],
+            'dt': self.expected_data.get('dt')[0:6]
+        }, 1)
+        data1 = {
+            'user_id': self.expected_data.get('user_id')[6:12],
+            'item_id': [7001, 8001, 9001, 1001, 1101, 1201],
+            'behavior': ['f-new', 'e-new', 'd-new', 'c-new', 'b-new', 'a-new'],
+            'dt': self.expected_data.get('dt')[6:12]
+        }
+        data2 = {
+            'user_id': self.expected_data.get('user_id')[3:9],
+            'item_id': [4001, 5001, 6001, 7001, 8001, 9001],
+            'behavior': ['i-new', None, 'g-new', 'f-new', 'e-new', 'd-new'],
+            'dt': self.expected_data.get('dt')[3:9]
+        }
+        datas = [data1, data2]
+        for idx, using_py in enumerate([0, 0, 1, 1, 0, 0]):  # TODO: this can 
be a random list
+            data1['item_id'] = [item + 1 for item in data1['item_id']]
+            data2['item_id'] = [item + 1 for item in data2['item_id']]
+            data = datas[idx % 2]
+            self._write_data(data, using_py)
+
+        j_read_builder = self.j_table.new_read_builder()
+        j_table_read = j_read_builder.new_read()
+        j_splits = j_read_builder.new_scan().plan().splits()
+        j_actual = j_table_read.to_arrow(j_splits).sort_by('user_id')
+        self.assertEqual(j_actual, self.expected_result)
+
+        py_read_builder = self.py_table.new_read_builder()
+        py_table_read = py_read_builder.new_read()
+        py_splits = py_read_builder.new_scan().plan().splits()
+        py_actual = py_table_read.to_arrow(py_splits).sort_by('user_id')
+        self.assertEqual(py_actual, self.expected_result)
+
+    def _write_data(self, data, using_py_table: int):
+        if using_py_table == 1:
+            write_builder = self.py_table.new_batch_write_builder()
+        else:
+            write_builder = self.j_table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        table_commit = write_builder.new_commit()
+        pa_table = pa.Table.from_pydict(data, schema=self.pa_schema)
+        table_write.write_arrow(pa_table)
+        table_commit.commit(table_write.prepare_commit())
+        table_write.close()
+        table_commit.close()
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py 
b/paimon-python/pypaimon/tests/py4j_impl/__init__.py
similarity index 64%
copy from paimon-python/pypaimon/manifest/schema/simple_stats.py
copy to paimon-python/pypaimon/tests/py4j_impl/__init__.py
index b291c12852..65b48d4d79 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/tests/py4j_impl/__init__.py
@@ -15,25 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
-from dataclasses import dataclass
-
-from pypaimon.table.row.binary_row import BinaryRow
-
-
-@dataclass
-class SimpleStats:
-    min_value: BinaryRow
-    max_value: BinaryRow
-    null_count: int
-
-
-SIMPLE_STATS_SCHEMA = {
-    "type": "record",
-    "name": "SimpleStats",
-    "fields": [
-        {"name": "_MIN_VALUES", "type": ["null", "bytes"], "default": None},
-        {"name": "_MAX_VALUES", "type": ["null", "bytes"], "default": None},
-        {"name": "_NULL_COUNTS", "type": ["null", "long"], "default": None},
-    ]
-}
diff --git a/paimon-python/pypaimon/manifest/schema/simple_stats.py 
b/paimon-python/pypaimon/tests/py4j_impl/constants.py
similarity index 64%
copy from paimon-python/pypaimon/manifest/schema/simple_stats.py
copy to paimon-python/pypaimon/tests/py4j_impl/constants.py
index b291c12852..af7d1c8165 100644
--- a/paimon-python/pypaimon/manifest/schema/simple_stats.py
+++ b/paimon-python/pypaimon/tests/py4j_impl/constants.py
@@ -16,24 +16,13 @@
 # limitations under the License.
 
################################################################################
 
-from dataclasses import dataclass
+# ---------------------------- for env var ----------------------------
+PYPAIMON_CONN_INFO_PATH = '_PYPAIMON_CONN_INFO_PATH'
+PYPAIMON_JVM_ARGS = '_PYPAIMON_JVM_ARGS'
+PYPAIMON_JAVA_CLASSPATH = '_PYPAIMON_JAVA_CLASSPATH'
+PYPAIMON_HADOOP_CLASSPATH = '_PYPAIMON_HADOOP_CLASSPATH'
+PYPAIMON_MAIN_CLASS = 'org.apache.paimon.python.PythonGatewayServer'
+PYPAIMON_MAIN_ARGS = '_PYPAIMON_MAIN_ARGS'
 
-from pypaimon.table.row.binary_row import BinaryRow
-
-
-@dataclass
-class SimpleStats:
-    min_value: BinaryRow
-    max_value: BinaryRow
-    null_count: int
-
-
-SIMPLE_STATS_SCHEMA = {
-    "type": "record",
-    "name": "SimpleStats",
-    "fields": [
-        {"name": "_MIN_VALUES", "type": ["null", "bytes"], "default": None},
-        {"name": "_MAX_VALUES", "type": ["null", "bytes"], "default": None},
-        {"name": "_NULL_COUNTS", "type": ["null", "long"], "default": None},
-    ]
-}
+# ------------------ for tests (Please don't use it) ------------------
+PYPAIMON4J_TEST_MODE = '_PYPAIMON4J_TEST_MODE'
diff --git a/paimon-python/pypaimon/tests/py4j_impl/gateway_factory.py 
b/paimon-python/pypaimon/tests/py4j_impl/gateway_factory.py
new file mode 100644
index 0000000000..4f306e374b
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py4j_impl/gateway_factory.py
@@ -0,0 +1,135 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import shutil
+import struct
+import tempfile
+import time
+from logging import WARN
+from threading import RLock
+
+from py4j.java_gateway import (CallbackServerParameters, GatewayParameters,
+                               JavaGateway, JavaPackage, java_import, logger)
+
+from pypaimon.tests.py4j_impl import constants
+from pypaimon.tests.py4j_impl.gateway_server import \
+    launch_gateway_server_process
+
+_gateway = None
+_lock = RLock()
+
+
+def get_gateway():
+    # type: () -> JavaGateway
+    global _gateway
+    with _lock:
+        if _gateway is None:
+            # Set the level to WARN to mute the noisy INFO level logs
+            logger.level = WARN
+            _gateway = launch_gateway()
+
+            callback_server = _gateway.get_callback_server()
+            callback_server_listening_address = 
callback_server.get_listening_address()
+            callback_server_listening_port = 
callback_server.get_listening_port()
+            
_gateway.jvm.org.apache.paimon.python.PythonEnvUtils.resetCallbackClient(
+                _gateway.java_gateway_server,
+                callback_server_listening_address,
+                callback_server_listening_port)
+            import_paimon_view(_gateway)
+            install_py4j_hooks()
+            _gateway.entry_point.put("Watchdog", Watchdog())
+    return _gateway
+
+
+def launch_gateway():
+    # type: () -> JavaGateway
+    """
+    launch jvm gateway
+    """
+
+    # Create a temporary directory where the gateway server should write the 
connection information.
+    conn_info_dir = tempfile.mkdtemp()
+    try:
+        fd, conn_info_file = tempfile.mkstemp(dir=conn_info_dir)
+        os.close(fd)
+        os.unlink(conn_info_file)
+
+        env = dict(os.environ)
+        env[constants.PYPAIMON_CONN_INFO_PATH] = conn_info_file
+
+        p = launch_gateway_server_process(env)
+
+        while not p.poll() and not os.path.isfile(conn_info_file):
+            time.sleep(0.1)
+
+        if not os.path.isfile(conn_info_file):
+            stderr_info = p.stderr.read().decode('utf-8')
+            raise RuntimeError(
+                "Java gateway process exited before sending its port 
number.\nStderr:\n"
+                + stderr_info
+            )
+
+        with open(conn_info_file, "rb") as info:
+            gateway_port = struct.unpack("!I", info.read(4))[0]
+    finally:
+        shutil.rmtree(conn_info_dir)
+
+    # Connect to the gateway
+    gateway = JavaGateway(
+        gateway_parameters=GatewayParameters(port=gateway_port, 
auto_convert=True),
+        callback_server_parameters=CallbackServerParameters(
+            port=0, daemonize=True, daemonize_connections=True))
+
+    return gateway
+
+
+def import_paimon_view(gateway):
+    java_import(gateway.jvm, "org.apache.paimon.table.*")
+    java_import(gateway.jvm, "org.apache.paimon.options.Options")
+    java_import(gateway.jvm, "org.apache.paimon.catalog.*")
+    java_import(gateway.jvm, "org.apache.paimon.schema.Schema*")
+    java_import(gateway.jvm, 'org.apache.paimon.types.*')
+    java_import(gateway.jvm, 'org.apache.paimon.python.*')
+    java_import(gateway.jvm, "org.apache.paimon.data.*")
+    java_import(gateway.jvm, "org.apache.paimon.predicate.PredicateBuilder")
+
+
+def install_py4j_hooks():
+    """
+    Hook the classes such as JavaPackage, etc of Py4j to improve the exception 
message.
+    """
+    def wrapped_call(self, *args, **kwargs):
+        raise TypeError(
+            "Could not found the Java class '%s'. The Java dependencies could 
be specified via "
+            "command line argument '--jarfile' or the config option 
'pipeline.jars'" % self._fqn)
+
+    setattr(JavaPackage, '__call__', wrapped_call)
+
+
+class Watchdog(object):
+    """
+    Used to provide to Java side to check whether its parent process is alive.
+    """
+
+    def ping(self):
+        time.sleep(10)
+        return True
+
+    class Java:
+        implements = ["org.apache.paimon.python.PythonGatewayServer$Watchdog"]
diff --git a/paimon-python/pypaimon/tests/py4j_impl/gateway_server.py 
b/paimon-python/pypaimon/tests/py4j_impl/gateway_server.py
new file mode 100644
index 0000000000..5de2bcd154
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py4j_impl/gateway_server.py
@@ -0,0 +1,122 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import importlib.resources
+import os
+import platform
+import signal
+from subprocess import PIPE, Popen
+
+from pypaimon.tests.py4j_impl import constants
+
+
+def on_windows():
+    return platform.system() == "Windows"
+
+
+def find_java_executable():
+    java_executable = "java.exe" if on_windows() else "java"
+    java_home = None
+
+    if java_home is None and "JAVA_HOME" in os.environ:
+        java_home = os.environ["JAVA_HOME"]
+
+    if java_home is not None:
+        java_executable = os.path.join(java_home, "bin", java_executable)
+
+    return java_executable
+
+
+def launch_gateway_server_process(env):
+    java_executable = find_java_executable()
+    log_settings = []
+    jvm_args = env.get(constants.PYPAIMON_JVM_ARGS, '').split()
+    classpath = _get_classpath(env)
+    print(f"py4j classpath: {classpath}")
+    main_args = env.get(constants.PYPAIMON_MAIN_ARGS, '').split()
+    command = [
+        java_executable,
+        *jvm_args,
+        # default jvm args
+        "-XX:+IgnoreUnrecognizedVMOptions",
+        "--add-opens=jdk.proxy2/jdk.proxy2=ALL-UNNAMED",
+        "--add-opens=java.base/java.nio=ALL-UNNAMED",
+        *log_settings,
+        "-cp",
+        classpath,
+        "-c",
+        constants.PYPAIMON_MAIN_CLASS,
+        *main_args
+    ]
+
+    preexec_fn = None
+    if not on_windows():
+        def preexec_func():
+            # ignore ctrl-c / SIGINT
+            signal.signal(signal.SIGINT, signal.SIG_IGN)
+
+        preexec_fn = preexec_func
+    return Popen(list(filter(lambda c: len(c) != 0, command)),
+                 stdin=PIPE, stderr=PIPE, preexec_fn=preexec_fn, env=env)
+
+
+_JAVA_DEPS_PACKAGE = 'pypaimon.jars'
+
+
+def _get_classpath(env):
+    classpath = []
+
+    # note that jars are not packaged in test
+    test_mode = os.environ.get(constants.PYPAIMON4J_TEST_MODE)
+    if not test_mode or test_mode.lower() != "true":
+        jars = importlib.resources.files(_JAVA_DEPS_PACKAGE)
+        one_jar = next(iter(jars.iterdir()), None)
+        if not one_jar:
+            raise ValueError("Haven't found necessary python-java-bridge jar, 
this is unexpected.")
+        builtin_java_classpath = os.path.join(os.path.dirname(str(one_jar)), 
'*')
+        classpath.append(builtin_java_classpath)
+
+    # user defined
+    if constants.PYPAIMON_JAVA_CLASSPATH in env:
+        classpath.append(env[constants.PYPAIMON_JAVA_CLASSPATH])
+
+    # hadoop
+    hadoop_classpath = _get_hadoop_classpath(env)
+    if hadoop_classpath is not None:
+        classpath.append(hadoop_classpath)
+
+    return os.pathsep.join(classpath)
+
+
+_HADOOP_DEPS_PACKAGE = 'pypaimon.hadoop-deps'
+
+
+def _get_hadoop_classpath(env):
+    if constants.PYPAIMON_HADOOP_CLASSPATH in env:
+        return env[constants.PYPAIMON_HADOOP_CLASSPATH]
+    elif 'HADOOP_CLASSPATH' in env:
+        return env['HADOOP_CLASSPATH']
+    else:
+        # use built-in hadoop
+        jars = importlib.resources.files(_HADOOP_DEPS_PACKAGE)
+        one_jar = next(iter(jars.iterdir()), None)
+        if not one_jar:
+            raise EnvironmentError(f"The built-in Hadoop environment has been 
broken, this \
+            is unexpected. You can set one of 
'{constants.PYPAIMON_HADOOP_CLASSPATH}' or \
+            'HADOOP_CLASSPATH' to continue.")
+        return os.path.join(os.path.dirname(str(one_jar)), '*')
diff --git a/paimon-python/pypaimon/tests/py4j_impl/java_implementation.py 
b/paimon-python/pypaimon/tests/py4j_impl/java_implementation.py
new file mode 100644
index 0000000000..3001519bd7
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py4j_impl/java_implementation.py
@@ -0,0 +1,383 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# pypaimon.api implementation based on Java code & py4j lib
+
+from typing import Any, Iterator, List, Optional
+
+import pyarrow as pa
+
+from pypaimon.tests.py4j_impl import java_utils
+from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+from pypaimon.tests.py4j_impl.java_utils import (deserialize_java_object,
+                                                 serialize_java_object)
+
+
+class SchemaPy4j:
+    """Schema of a table."""
+
+    def __init__(self,
+                 pa_schema: pa.Schema,
+                 partition_keys: Optional[List[str]] = None,
+                 primary_keys: Optional[List[str]] = None,
+                 options: Optional[dict] = None,
+                 comment: Optional[str] = None):
+        self.pa_schema = pa_schema
+        self.partition_keys = partition_keys
+        self.primary_keys = primary_keys
+        self.options = options
+        self.comment = comment
+
+
+class CatalogPy4j:
+
+    def __init__(self, j_catalog, catalog_options: dict):
+        self._j_catalog = j_catalog
+        self._catalog_options = catalog_options
+
+    @staticmethod
+    def create(catalog_options: dict) -> 'CatalogPy4j':
+        j_catalog_context = java_utils.to_j_catalog_context(catalog_options)
+        gateway = get_gateway()
+        j_catalog = gateway.jvm.CatalogFactory.createCatalog(j_catalog_context)
+        return CatalogPy4j(j_catalog, catalog_options)
+
+    def get_table(self, identifier: str) -> 'TablePy4j':
+        j_identifier = java_utils.to_j_identifier(identifier)
+        j_table = self._j_catalog.getTable(j_identifier)
+        return TablePy4j(j_table, self._catalog_options)
+
+    def create_database(self, name: str, ignore_if_exists: bool, properties: 
Optional[dict] = None):
+        if properties is None:
+            properties = {}
+        self._j_catalog.createDatabase(name, ignore_if_exists, properties)
+
+    def create_table(self, identifier: str, schema: SchemaPy4j, 
ignore_if_exists: bool):
+        j_identifier = java_utils.to_j_identifier(identifier)
+        j_schema = java_utils.to_paimon_schema(schema)
+        self._j_catalog.createTable(j_identifier, j_schema, ignore_if_exists)
+
+
+class TablePy4j:
+
+    def __init__(self, j_table, catalog_options: dict):
+        self._j_table = j_table
+        self._catalog_options = catalog_options
+
+    def new_read_builder(self) -> 'ReadBuilderPy4j':
+        j_read_builder = 
get_gateway().jvm.InvocationUtil.getReadBuilder(self._j_table)
+        return ReadBuilderPy4j(j_read_builder, self._j_table.rowType(), 
self._catalog_options)
+
+    def new_batch_write_builder(self) -> 'BatchWriteBuilderPy4j':
+        java_utils.check_batch_write(self._j_table)
+        j_batch_write_builder = 
get_gateway().jvm.InvocationUtil.getBatchWriteBuilder(self._j_table)
+        return BatchWriteBuilderPy4j(j_batch_write_builder)
+
+
+class ReadBuilderPy4j:
+
+    def __init__(self, j_read_builder, j_row_type, catalog_options: dict):
+        self._j_read_builder = j_read_builder
+        self._j_row_type = j_row_type
+        self._catalog_options = catalog_options
+
+    def with_filter(self, predicate: 'PredicatePy4j'):
+        self._j_read_builder.withFilter(predicate.to_j_predicate())
+        return self
+
+    def with_projection(self, projection: List[str]) -> 'ReadBuilderPy4j':
+        field_names = list(map(lambda field: field.name(), 
self._j_row_type.getFields()))
+        int_projection = list(map(lambda p: field_names.index(p), projection))
+        gateway = get_gateway()
+        int_projection_arr = gateway.new_array(gateway.jvm.int, 
len(projection))
+        for i in range(len(projection)):
+            int_projection_arr[i] = int_projection[i]
+        self._j_read_builder.withProjection(int_projection_arr)
+        return self
+
+    def with_limit(self, limit: int) -> 'ReadBuilderPy4j':
+        self._j_read_builder.withLimit(limit)
+        return self
+
+    def new_scan(self) -> 'TableScanPy4j':
+        j_table_scan = self._j_read_builder.newScan()
+        return TableScanPy4j(j_table_scan)
+
+    def new_read(self) -> 'TableReadPy4j':
+        j_table_read = self._j_read_builder.newRead().executeFilter()
+        return TableReadPy4j(j_table_read, self._j_read_builder.readType(), 
self._catalog_options)
+
+    def new_predicate_builder(self) -> 'PredicateBuilderPy4j':
+        return PredicateBuilderPy4j(self._j_row_type)
+
+    def read_type(self) -> 'RowTypePy4j':
+        return RowTypePy4j(self._j_read_builder.readType())
+
+
+class RowTypePy4j:
+
+    def __init__(self, j_row_type):
+        self._j_row_type = j_row_type
+
+    def as_arrow(self) -> "pa.Schema":
+        return java_utils.to_arrow_schema(self._j_row_type)
+
+
+class TableScanPy4j:
+
+    def __init__(self, j_table_scan):
+        self._j_table_scan = j_table_scan
+
+    def plan(self) -> 'PlanPy4j':
+        j_plan = self._j_table_scan.plan()
+        j_splits = j_plan.splits()
+        return PlanPy4j(j_splits)
+
+
+class PlanPy4j:
+
+    def __init__(self, j_splits):
+        self._j_splits = j_splits
+
+    def splits(self) -> List['SplitPy4j']:
+        return list(map(lambda s: self._build_single_split(s), self._j_splits))
+
+    def _build_single_split(self, j_split) -> 'SplitPy4j':
+        j_split_bytes = serialize_java_object(j_split)
+        row_count = j_split.rowCount()
+        files_optional = j_split.convertToRawFiles()
+        if not files_optional.isPresent():
+            file_size = 0
+            file_paths = []
+        else:
+            files = files_optional.get()
+            file_size = sum(file.length() for file in files)
+            file_paths = [file.path() for file in files]
+        return SplitPy4j(j_split_bytes, row_count, file_size, file_paths)
+
+
+class SplitPy4j:
+
+    def __init__(self, j_split_bytes, row_count: int, file_size: int, 
file_paths: List[str]):
+        self._j_split_bytes = j_split_bytes
+        self._row_count = row_count
+        self._file_size = file_size
+        self._file_paths = file_paths
+
+    def to_j_split(self):
+        return deserialize_java_object(self._j_split_bytes)
+
+    def row_count(self) -> int:
+        return self._row_count
+
+    def file_size(self) -> int:
+        return self._file_size
+
+    def file_paths(self) -> List[str]:
+        return self._file_paths
+
+
+class TableReadPy4j:
+
+    def __init__(self, j_table_read, j_read_type, catalog_options):
+        self._arrow_schema = java_utils.to_arrow_schema(j_read_type)
+        self._j_bytes_reader = 
get_gateway().jvm.InvocationUtil.createParallelBytesReader(
+            j_table_read, j_read_type, 1)
+
+    def to_arrow(self, splits):
+        record_batch_reader = self.to_arrow_batch_reader(splits)
+        return pa.Table.from_batches(record_batch_reader, 
schema=self._arrow_schema)
+
+    def to_arrow_batch_reader(self, splits):
+        j_splits = list(map(lambda s: s.to_j_split(), splits))
+        self._j_bytes_reader.setSplits(j_splits)
+        batch_iterator = self._batch_generator()
+        return pa.RecordBatchReader.from_batches(self._arrow_schema, 
batch_iterator)
+
+    def _batch_generator(self) -> Iterator[pa.RecordBatch]:
+        while True:
+            next_bytes = self._j_bytes_reader.next()
+            if next_bytes is None:
+                break
+            else:
+                stream_reader = 
pa.RecordBatchStreamReader(pa.BufferReader(next_bytes))
+                yield from stream_reader
+
+
+class BatchWriteBuilderPy4j:
+
+    def __init__(self, j_batch_write_builder):
+        self._j_batch_write_builder = j_batch_write_builder
+
+    def overwrite(self, static_partition: Optional[dict] = None) -> 
'BatchWriteBuilderPy4j':
+        if static_partition is None:
+            static_partition = {}
+        self._j_batch_write_builder.withOverwrite(static_partition)
+        return self
+
+    def new_write(self) -> 'BatchTableWritePy4j':
+        j_batch_table_write = self._j_batch_write_builder.newWrite()
+        return BatchTableWritePy4j(j_batch_table_write, 
self._j_batch_write_builder.rowType())
+
+    def new_commit(self) -> 'BatchTableCommitPy4j':
+        j_batch_table_commit = self._j_batch_write_builder.newCommit()
+        return BatchTableCommitPy4j(j_batch_table_commit)
+
+
+class BatchTableWritePy4j:
+
+    def __init__(self, j_batch_table_write, j_row_type):
+        self._j_batch_table_write = j_batch_table_write
+        self._j_bytes_writer = 
get_gateway().jvm.InvocationUtil.createBytesWriter(
+            j_batch_table_write, j_row_type)
+        self._arrow_schema = java_utils.to_arrow_schema(j_row_type)
+
+    def write_arrow(self, table):
+        for record_batch in table.to_reader():
+            self._write_arrow_batch(record_batch)
+
+    def write_arrow_batch(self, record_batch):
+        self._write_arrow_batch(record_batch)
+
+    def _write_arrow_batch(self, record_batch):
+        stream = pa.BufferOutputStream()
+        with pa.RecordBatchStreamWriter(stream, record_batch.schema) as writer:
+            writer.write(record_batch)
+        arrow_bytes = stream.getvalue().to_pybytes()
+        self._j_bytes_writer.write(arrow_bytes)
+
+    def prepare_commit(self) -> List['CommitMessagePy4j']:
+        j_commit_messages = self._j_batch_table_write.prepareCommit()
+        return list(map(lambda cm: CommitMessagePy4j(cm), j_commit_messages))
+
+    def close(self):
+        self._j_batch_table_write.close()
+        self._j_bytes_writer.close()
+
+
+class CommitMessagePy4j:
+
+    def __init__(self, j_commit_message):
+        self._j_commit_message = j_commit_message
+
+    def to_j_commit_message(self):
+        return self._j_commit_message
+
+
+class BatchTableCommitPy4j:
+
+    def __init__(self, j_batch_table_commit):
+        self._j_batch_table_commit = j_batch_table_commit
+
+    def commit(self, commit_messages: List[CommitMessagePy4j]):
+        j_commit_messages = list(map(lambda cm: cm.to_j_commit_message(), 
commit_messages))
+        self._j_batch_table_commit.commit(j_commit_messages)
+
+    def close(self):
+        self._j_batch_table_commit.close()
+
+
+class PredicatePy4j:
+
+    def __init__(self, j_predicate_bytes):
+        self._j_predicate_bytes = j_predicate_bytes
+
+    def to_j_predicate(self):
+        return deserialize_java_object(self._j_predicate_bytes)
+
+
+class PredicateBuilderPy4j:
+
+    def __init__(self, j_row_type):
+        self._field_names = j_row_type.getFieldNames()
+        self._j_row_type = j_row_type
+        self._j_predicate_builder = 
get_gateway().jvm.PredicateBuilder(j_row_type)
+
+    def _build(self, method: str, field: str, literals: Optional[List[Any]] = 
None):
+        error = ValueError(f'The field {field} is not in field list 
{self._field_names}.')
+        try:
+            index = self._field_names.index(field)
+            if index == -1:
+                raise error
+        except ValueError:
+            raise error
+
+        if literals is None:
+            literals = []
+
+        j_predicate = get_gateway().jvm.PredicationUtil.build(
+            self._j_row_type,
+            self._j_predicate_builder,
+            method,
+            index,
+            literals
+        )
+        return PredicatePy4j(serialize_java_object(j_predicate))
+
+    def equal(self, field: str, literal: Any) -> PredicatePy4j:
+        return self._build('equal', field, [literal])
+
+    def not_equal(self, field: str, literal: Any) -> PredicatePy4j:
+        return self._build('notEqual', field, [literal])
+
+    def less_than(self, field: str, literal: Any) -> PredicatePy4j:
+        return self._build('lessThan', field, [literal])
+
+    def less_or_equal(self, field: str, literal: Any) -> PredicatePy4j:
+        return self._build('lessOrEqual', field, [literal])
+
+    def greater_than(self, field: str, literal: Any) -> PredicatePy4j:
+        return self._build('greaterThan', field, [literal])
+
+    def greater_or_equal(self, field: str, literal: Any) -> PredicatePy4j:
+        return self._build('greaterOrEqual', field, [literal])
+
+    def is_null(self, field: str) -> PredicatePy4j:
+        return self._build('isNull', field)
+
+    def is_not_null(self, field: str) -> PredicatePy4j:
+        return self._build('isNotNull', field)
+
+    def startswith(self, field: str, pattern_literal: Any) -> PredicatePy4j:
+        return self._build('startsWith', field, [pattern_literal])
+
+    def endswith(self, field: str, pattern_literal: Any) -> PredicatePy4j:
+        return self._build('endsWith', field, [pattern_literal])
+
+    def contains(self, field: str, pattern_literal: Any) -> PredicatePy4j:
+        return self._build('contains', field, [pattern_literal])
+
+    def is_in(self, field: str, literals: List[Any]) -> PredicatePy4j:
+        return self._build('in', field, literals)
+
+    def is_not_in(self, field: str, literals: List[Any]) -> PredicatePy4j:
+        return self._build('notIn', field, literals)
+
+    def between(self, field: str, included_lower_bound: Any, 
included_upper_bound: Any) \
+            -> PredicatePy4j:
+        return self._build('between', field, [included_lower_bound, 
included_upper_bound])
+
+    def and_predicates(self, predicates: List[PredicatePy4j]) -> PredicatePy4j:
+        predicates = list(map(lambda p: p.to_j_predicate(), predicates))
+        j_predicate = get_gateway().jvm.PredicationUtil.buildAnd(predicates)
+        return PredicatePy4j(serialize_java_object(j_predicate))
+
+    def or_predicates(self, predicates: List[PredicatePy4j]) -> PredicatePy4j:
+        predicates = list(map(lambda p: p.to_j_predicate(), predicates))
+        j_predicate = get_gateway().jvm.PredicationUtil.buildOr(predicates)
+        return PredicatePy4j(serialize_java_object(j_predicate))
diff --git a/paimon-python/pypaimon/tests/py4j_impl/java_utils.py 
b/paimon-python/pypaimon/tests/py4j_impl/java_utils.py
new file mode 100644
index 0000000000..1329c2cd7c
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py4j_impl/java_utils.py
@@ -0,0 +1,132 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import pyarrow as pa
+
+
+def to_j_catalog_context(catalog_options: dict):
+    from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+    gateway = get_gateway()
+    j_options = gateway.jvm.Options(catalog_options)
+    return gateway.jvm.CatalogContext.create(j_options)
+
+
+def to_j_identifier(identifier: str):
+    from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+    return get_gateway().jvm.Identifier.fromString(identifier)
+
+
+def to_paimon_schema(schema: 'SchemaPy4j'):
+    from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+    j_schema_builder = get_gateway().jvm.Schema.newBuilder()
+
+    if schema.partition_keys is not None:
+        j_schema_builder.partitionKeys(schema.partition_keys)
+
+    if schema.primary_keys is not None:
+        j_schema_builder.primaryKey(schema.primary_keys)
+
+    if schema.options is not None:
+        j_schema_builder.options(schema.options)
+
+    j_schema_builder.comment(schema.comment)
+
+    for field in schema.pa_schema:
+        column_name = field.name
+        column_type = _to_j_type(column_name, field.type)
+        j_schema_builder.column(column_name, column_type)
+    return j_schema_builder.build()
+
+
+def check_batch_write(j_table):
+    from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+    gateway = get_gateway()
+    bucket_mode = j_table.bucketMode()
+    if bucket_mode == gateway.jvm.BucketMode.HASH_DYNAMIC \
+            or bucket_mode == gateway.jvm.BucketMode.CROSS_PARTITION:
+        raise TypeError("Doesn't support writing dynamic bucket or cross 
partition table.")
+
+
+def _to_j_type(name, pa_type):
+    from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+    jvm = get_gateway().jvm
+    # int
+    if pa.types.is_int8(pa_type):
+        return jvm.DataTypes.TINYINT()
+    elif pa.types.is_int16(pa_type):
+        return jvm.DataTypes.SMALLINT()
+    elif pa.types.is_int32(pa_type):
+        return jvm.DataTypes.INT()
+    elif pa.types.is_int64(pa_type):
+        return jvm.DataTypes.BIGINT()
+    # float
+    elif pa.types.is_float16(pa_type) or pa.types.is_float32(pa_type):
+        return jvm.DataTypes.FLOAT()
+    elif pa.types.is_float64(pa_type):
+        return jvm.DataTypes.DOUBLE()
+    # string
+    elif pa.types.is_string(pa_type):
+        return jvm.DataTypes.STRING()
+    # bool
+    elif pa.types.is_boolean(pa_type):
+        return jvm.DataTypes.BOOLEAN()
+    elif pa.types.is_null(pa_type):
+        print(f"WARN: The type of column '{name}' is null, "
+              "and it will be converted to string type by default. "
+              "Please check if the original type is string. "
+              f"If not, please manually specify the type of '{name}'.")
+        return jvm.DataTypes.STRING()
+    else:
+        raise ValueError(f'Found unsupported data type {str(pa_type)} for 
field {name}.')
+
+
+def to_arrow_schema(j_row_type):
+    from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+    # init arrow schema
+    schema_bytes = get_gateway().jvm.SchemaUtil.getArrowSchema(j_row_type)
+    schema_reader = pa.RecordBatchStreamReader(pa.BufferReader(schema_bytes))
+    arrow_schema = schema_reader.schema
+    schema_reader.close()
+    return arrow_schema
+
+
+def serialize_java_object(java_obj) -> bytes:
+    from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+    gateway = get_gateway()
+    util = gateway.jvm.org.apache.paimon.utils.InstantiationUtil
+    try:
+        java_bytes = util.serializeObject(java_obj)
+        return bytes(java_bytes)
+    except Exception as e:
+        raise RuntimeError(f"Java serialization failed: {e}")
+
+
+def deserialize_java_object(bytes_data):
+    from pypaimon.tests.py4j_impl.gateway_factory import get_gateway
+
+    gateway = get_gateway()
+    cl = get_gateway().jvm.Thread.currentThread().getContextClassLoader()
+    util = gateway.jvm.org.apache.paimon.utils.InstantiationUtil
+    return util.deserializeObject(bytes_data, cl)
diff --git 
a/paimon-python/pypaimon/tests/py4j_impl/test_deps/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
 
b/paimon-python/pypaimon/tests/py4j_impl/test_deps/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
new file mode 100644
index 0000000000..a7b50b073a
Binary files /dev/null and 
b/paimon-python/pypaimon/tests/py4j_impl/test_deps/flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
 differ
diff --git 
a/paimon-python/pypaimon/tests/py4j_impl/test_deps/paimon-python-java-bridge-0.9-SNAPSHOT.jar
 
b/paimon-python/pypaimon/tests/py4j_impl/test_deps/paimon-python-java-bridge-0.9-SNAPSHOT.jar
new file mode 100644
index 0000000000..dfbe9d7d0b
Binary files /dev/null and 
b/paimon-python/pypaimon/tests/py4j_impl/test_deps/paimon-python-java-bridge-0.9-SNAPSHOT.jar
 differ
diff --git a/paimon-python/pypaimon/tests/reader_primary_key_test.py 
b/paimon-python/pypaimon/tests/reader_primary_key_test.py
index 4671e32996..b9b115dfa4 100644
--- a/paimon-python/pypaimon/tests/reader_primary_key_test.py
+++ b/paimon-python/pypaimon/tests/reader_primary_key_test.py
@@ -38,10 +38,10 @@ class PkReaderTest(unittest.TestCase):
         cls.catalog.create_database('default', False)
 
         cls.pa_schema = pa.schema([
-            ('user_id', pa.int32()),
+            pa.field('user_id', pa.int32(), nullable=False),
             ('item_id', pa.int64()),
             ('behavior', pa.string()),
-            ('dt', pa.string())
+            pa.field('dt', pa.string(), nullable=False)
         ])
         cls.expected = pa.Table.from_pydict({
             'user_id': [1, 2, 3, 4, 5, 7, 8],
diff --git a/paimon-python/pypaimon/tests/rest_catalog_base_test.py 
b/paimon-python/pypaimon/tests/rest_catalog_base_test.py
index 773d94abed..c035957ccc 100644
--- a/paimon-python/pypaimon/tests/rest_catalog_base_test.py
+++ b/paimon-python/pypaimon/tests/rest_catalog_base_test.py
@@ -186,7 +186,7 @@ class RESTCatalogBaseTest(unittest.TestCase):
         self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_table/snapshot/snapshot-1"))
         self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_table/manifest"))
         self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_table/dt=p1"))
-        self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_table/manifest/*.avro")), 2)
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_table/manifest/*")), 3)
 
     def _write_test_table(self, table):
         write_builder = table.new_batch_write_builder()
diff --git a/paimon-python/pypaimon/tests/rest_server.py 
b/paimon-python/pypaimon/tests/rest_server.py
index d0486ec02c..c326f3525d 100644
--- a/paimon-python/pypaimon/tests/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest_server.py
@@ -495,8 +495,8 @@ class RESTCatalogServer:
 
     def _write_snapshot_files(self, identifier: Identifier, snapshot, 
statistics):
         """Write snapshot and related files to the file system"""
-        import os
         import json
+        import os
         import uuid
 
         # Construct table path: {warehouse}/{database}/{table}
diff --git a/paimon-python/pypaimon/tests/rest_table_test.py 
b/paimon-python/pypaimon/tests/rest_table_test.py
index 9c64c17003..b905fa593b 100644
--- a/paimon-python/pypaimon/tests/rest_table_test.py
+++ b/paimon-python/pypaimon/tests/rest_table_test.py
@@ -168,7 +168,7 @@ class RESTTableTest(RESTCatalogBaseTest):
         self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_postpone/snapshot/LATEST"))
         self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_postpone/snapshot/snapshot-1"))
         self.assertTrue(os.path.exists(self.warehouse + 
"/default/test_postpone/manifest"))
-        self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_postpone/manifest/*.avro")), 2)
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_postpone/manifest/*")), 3)
         self.assertEqual(len(glob.glob(self.warehouse + 
"/default/test_postpone/user_id=2/bucket-postpone/*.avro")), 1)
 
     def test_postpone_read_write(self):
diff --git a/paimon-python/pypaimon/tests/schema_test.py 
b/paimon-python/pypaimon/tests/schema_test.py
index daa9442e50..766676e95f 100644
--- a/paimon-python/pypaimon/tests/schema_test.py
+++ b/paimon-python/pypaimon/tests/schema_test.py
@@ -32,7 +32,8 @@ class SchemaTestCase(unittest.TestCase):
             DataField(0, "name", AtomicType('INT'), 'desc  name'),
             DataField(1, "arr", ArrayType(True, AtomicType('INT')), 'desc 
arr1'),
             DataField(2, "map1",
-                      MapType(False, AtomicType('INT'), MapType(False, 
AtomicType('INT'), AtomicType('INT'))),
+                      MapType(False, AtomicType('INT', False),
+                              MapType(False, AtomicType('INT', False), 
AtomicType('INT', False))),
                       'desc map1'),
         ]
         table_schema = TableSchema(TableSchema.CURRENT_VERSION, 
len(data_fields), data_fields,
diff --git a/paimon-python/pypaimon/tests/writer_test.py 
b/paimon-python/pypaimon/tests/writer_test.py
index c0c242d2c4..8bf38f72e0 100644
--- a/paimon-python/pypaimon/tests/writer_test.py
+++ b/paimon-python/pypaimon/tests/writer_test.py
@@ -71,7 +71,7 @@ class WriterTest(unittest.TestCase):
         self.assertTrue(os.path.exists(self.warehouse + 
"/test_db.db/test_table/snapshot/snapshot-1"))
         self.assertTrue(os.path.exists(self.warehouse + 
"/test_db.db/test_table/manifest"))
         self.assertTrue(os.path.exists(self.warehouse + 
"/test_db.db/test_table/bucket-0"))
-        self.assertEqual(len(glob.glob(self.warehouse + 
"/test_db.db/test_table/manifest/*.avro")), 2)
+        self.assertEqual(len(glob.glob(self.warehouse + 
"/test_db.db/test_table/manifest/*")), 3)
         self.assertEqual(len(glob.glob(self.warehouse + 
"/test_db.db/test_table/bucket-0/*.parquet")), 1)
 
         with open(self.warehouse + 
'/test_db.db/test_table/snapshot/snapshot-1', 'r', encoding='utf-8') as file:
diff --git a/paimon-python/pypaimon/write/commit_message.py 
b/paimon-python/pypaimon/write/commit_message.py
index 1a2177abb1..4e4c0f0b48 100644
--- a/paimon-python/pypaimon/write/commit_message.py
+++ b/paimon-python/pypaimon/write/commit_message.py
@@ -15,31 +15,17 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
+from dataclasses import dataclass
 from typing import List, Tuple
 
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
 
 
+@dataclass
 class CommitMessage:
-    """Python implementation of CommitMessage"""
-
-    def __init__(self, partition: Tuple, bucket: int, new_files: 
List[DataFileMeta]):
-        self._partition = partition
-        self._bucket = bucket
-        self._new_files = new_files or []
-
-    def partition(self) -> Tuple:
-        """Get the partition of this commit message."""
-        return self._partition
-
-    def bucket(self) -> int:
-        """Get the bucket of this commit message."""
-        return self._bucket
-
-    def new_files(self) -> List[DataFileMeta]:
-        """Get the list of new files."""
-        return self._new_files
+    partition: Tuple
+    bucket: int
+    new_files: List[DataFileMeta]
 
     def is_empty(self):
-        return not self._new_files
+        return not self.new_files
diff --git a/paimon-python/pypaimon/write/file_store_commit.py 
b/paimon-python/pypaimon/write/file_store_commit.py
index e98cb8610f..ca26981333 100644
--- a/paimon-python/pypaimon/write/file_store_commit.py
+++ b/paimon-python/pypaimon/write/file_store_commit.py
@@ -17,14 +17,18 @@
 
################################################################################
 
 import time
+import uuid
 from pathlib import Path
 from typing import List
 
 from pypaimon.catalog.snapshot_commit import PartitionStatistics, 
SnapshotCommit
 from pypaimon.manifest.manifest_file_manager import ManifestFileManager
 from pypaimon.manifest.manifest_list_manager import ManifestListManager
+from pypaimon.manifest.schema.manifest_file_meta import ManifestFileMeta
+from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.snapshot.snapshot import Snapshot
 from pypaimon.snapshot.snapshot_manager import SnapshotManager
+from pypaimon.table.row.binary_row import BinaryRow
 from pypaimon.write.commit_message import CommitMessage
 
 
@@ -55,39 +59,69 @@ class FileStoreCommit:
         if not commit_messages:
             return
 
-        new_manifest_files = self.manifest_file_manager.write(commit_messages)
-        if not new_manifest_files:
-            return
+        unique_id = uuid.uuid4()
+        base_manifest_list = f"manifest-list-{unique_id}-0"
+        delta_manifest_list = f"manifest-list-{unique_id}-1"
+
+        # process new_manifest
+        new_manifest_file = f"manifest-{str(uuid.uuid4())}-0"
+        self.manifest_file_manager.write(new_manifest_file, commit_messages)
+
+        partition_columns = list(zip(*(msg.partition for msg in 
commit_messages)))
+        partition_min_stats = [min(col) for col in partition_columns]
+        partition_max_stats = [max(col) for col in partition_columns]
+        partition_null_counts = [sum(value == 0 for value in col) for col in 
partition_columns]
+        if not all(count == 0 for count in partition_null_counts):
+            raise RuntimeError("Partition value should not be null")
+
+        new_manifest_list = ManifestFileMeta(
+            file_name=new_manifest_file,
+            
file_size=self.table.file_io.get_file_size(self.manifest_file_manager.manifest_path
 / new_manifest_file),
+            num_added_files=sum(len(msg.new_files) for msg in commit_messages),
+            num_deleted_files=0,
+            partition_stats=SimpleStats(
+                min_value=BinaryRow(
+                    values=partition_min_stats,
+                    fields=self.table.table_schema.get_partition_key_fields(),
+                ),
+                max_value=BinaryRow(
+                    values=partition_max_stats,
+                    fields=self.table.table_schema.get_partition_key_fields(),
+                ),
+                null_count=partition_null_counts,
+            ),
+            schema_id=self.table.table_schema.id,
+        )
+        self.manifest_list_manager.write(delta_manifest_list, 
[new_manifest_list])
 
+        # process existing_manifest
         latest_snapshot = self.snapshot_manager.get_latest_snapshot()
-
-        existing_manifest_files = []
-        record_count_add = self._generate_record_count_add(commit_messages)
-        total_record_count = record_count_add
-
+        total_record_count = 0
         if latest_snapshot:
-            existing_manifest_files = 
self.manifest_list_manager.read_all_manifest_files(latest_snapshot)
+            existing_manifest_files = 
self.manifest_list_manager.read_all(latest_snapshot)
             previous_record_count = latest_snapshot.total_record_count
             if previous_record_count:
                 total_record_count += previous_record_count
+        else:
+            existing_manifest_files = []
+        self.manifest_list_manager.write(base_manifest_list, 
existing_manifest_files)
 
-        new_manifest_files.extend(existing_manifest_files)
-        manifest_list = self.manifest_list_manager.write(new_manifest_files)
-
+        # process snapshot
         new_snapshot_id = self._generate_snapshot_id()
+        record_count_add = self._generate_record_count_add(commit_messages)
+        total_record_count += record_count_add
         snapshot_data = Snapshot(
-            version=3,
+            version=1,
             id=new_snapshot_id,
-            schema_id=0,
-            base_manifest_list=manifest_list,
-            delta_manifest_list=manifest_list,
+            schema_id=self.table.table_schema.id,
+            base_manifest_list=base_manifest_list,
+            delta_manifest_list=delta_manifest_list,
             total_record_count=total_record_count,
             delta_record_count=record_count_add,
             commit_user=self.commit_user,
             commit_identifier=commit_identifier,
             commit_kind="APPEND",
             time_millis=int(time.time() * 1000),
-            log_offsets={},
         )
 
         # Generate partition statistics for the commit
@@ -101,46 +135,11 @@ class FileStoreCommit:
 
     def overwrite(self, partition, commit_messages: List[CommitMessage], 
commit_identifier: int):
         """Commit the given commit messages in overwrite mode."""
-        if not commit_messages:
-            return
-
-        new_manifest_files = self.manifest_file_manager.write(commit_messages)
-        if not new_manifest_files:
-            return
-
-        # In overwrite mode, we don't merge with existing manifests
-        manifest_list = self.manifest_list_manager.write(new_manifest_files)
-
-        record_count_add = self._generate_record_count_add(commit_messages)
-
-        new_snapshot_id = self._generate_snapshot_id()
-        snapshot_data = Snapshot(
-            version=3,
-            id=new_snapshot_id,
-            schema_id=0,
-            base_manifest_list=manifest_list,
-            delta_manifest_list=manifest_list,
-            total_record_count=record_count_add,
-            delta_record_count=record_count_add,
-            commit_user=self.commit_user,
-            commit_identifier=commit_identifier,
-            commit_kind="OVERWRITE",
-            time_millis=int(time.time() * 1000),
-            log_offsets={},
-        )
-
-        # Generate partition statistics for the commit
-        statistics = self._generate_partition_statistics(commit_messages)
-
-        # Use SnapshotCommit for atomic commit
-        with self.snapshot_commit:
-            success = self.snapshot_commit.commit(snapshot_data, 
self.table.current_branch(), statistics)
-            if not success:
-                raise RuntimeError(f"Failed to commit snapshot 
{new_snapshot_id}")
+        raise RuntimeError("overwrite unsupported yet")
 
     def abort(self, commit_messages: List[CommitMessage]):
         for message in commit_messages:
-            for file in message.new_files():
+            for file in message.new_files:
                 try:
                     file_path_obj = Path(file.file_path)
                     if file_path_obj.exists():
@@ -179,7 +178,7 @@ class FileStoreCommit:
 
         for message in commit_messages:
             # Convert partition tuple to dictionary for PartitionStatistics
-            partition_value = message.partition()  # Call the method to get 
partition value
+            partition_value = message.partition  # Call the method to get 
partition value
             if partition_value:
                 # Assuming partition is a tuple and we need to convert it to a 
dict
                 # This may need adjustment based on actual partition format
@@ -213,8 +212,7 @@ class FileStoreCommit:
 
             # Process each file in the commit message
             # Following Java implementation: PartitionEntry.fromDataFile()
-            new_files = message.new_files()
-            for file_meta in new_files:
+            for file_meta in message.new_files:
                 # Extract actual file metadata (following Java DataFileMeta 
pattern)
                 record_count = file_meta.row_count
                 file_size_in_bytes = file_meta.file_size
@@ -266,7 +264,7 @@ class FileStoreCommit:
         record_count = 0
 
         for message in commit_messages:
-            new_files = message.new_files()
+            new_files = message.new_files
             for file_meta in new_files:
                 record_count += file_meta.row_count
 
diff --git a/paimon-python/pypaimon/write/file_store_write.py 
b/paimon-python/pypaimon/write/file_store_write.py
index 3bf6150b03..bcef10a4c3 100644
--- a/paimon-python/pypaimon/write/file_store_write.py
+++ b/paimon-python/pypaimon/write/file_store_write.py
@@ -15,7 +15,6 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
 from typing import Dict, List, Tuple
 
 import pyarrow as pa
@@ -34,6 +33,7 @@ class FileStoreWrite:
 
         self.table: FileStoreTable = table
         self.data_writers: Dict[Tuple, DataWriter] = {}
+        self.max_seq_numbers = self._seq_number_stats()  # TODO: build this 
on-demand instead of on all
 
     def write(self, partition: Tuple, bucket: int, data: pa.RecordBatch):
         key = (partition, bucket)
@@ -48,12 +48,14 @@ class FileStoreWrite:
                 table=self.table,
                 partition=partition,
                 bucket=bucket,
+                max_seq_number=self.max_seq_numbers.get((partition, bucket), 
1),
             )
         else:
             return AppendOnlyDataWriter(
                 table=self.table,
                 partition=partition,
                 bucket=bucket,
+                max_seq_number=self.max_seq_numbers.get((partition, bucket), 
1),
             )
 
     def prepare_commit(self) -> List[CommitMessage]:
@@ -74,3 +76,33 @@ class FileStoreWrite:
         for writer in self.data_writers.values():
             writer.close()
         self.data_writers.clear()
+
+    def _seq_number_stats(self) -> dict:
+        from pypaimon.manifest.manifest_file_manager import ManifestFileManager
+        from pypaimon.manifest.manifest_list_manager import ManifestListManager
+        from pypaimon.snapshot.snapshot_manager import SnapshotManager
+
+        snapshot_manager = SnapshotManager(self.table)
+        manifest_list_manager = ManifestListManager(self.table)
+        manifest_file_manager = ManifestFileManager(self.table)
+
+        latest_snapshot = snapshot_manager.get_latest_snapshot()
+        if not latest_snapshot:
+            return {}
+        manifest_files = manifest_list_manager.read_all(latest_snapshot)
+
+        file_entries = []
+        for manifest_file in manifest_files:
+            manifest_entries = 
manifest_file_manager.read(manifest_file.file_name)
+            for entry in manifest_entries:
+                if entry.kind == 0:
+                    file_entries.append(entry)
+
+        max_seq_numbers = {}
+        for entry in file_entries:
+            partition_key = (tuple(entry.partition.values), entry.bucket)
+            current_seq_num = entry.file.max_sequence_number
+            existing_max = max_seq_numbers.get(partition_key, -1)
+            if current_seq_num > existing_max:
+                max_seq_numbers[partition_key] = current_seq_num
+        return max_seq_numbers
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index c11d991b84..5d9641718c 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -15,16 +15,18 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
 import uuid
 from abc import ABC, abstractmethod
+from datetime import datetime
 from pathlib import Path
 from typing import List, Optional, Tuple
 
 import pyarrow as pa
+import pyarrow.compute as pc
 
 from pypaimon.common.core_options import CoreOptions
 from pypaimon.manifest.schema.data_file_meta import DataFileMeta
+from pypaimon.manifest.schema.simple_stats import SimpleStats
 from pypaimon.table.bucket_mode import BucketMode
 from pypaimon.table.row.binary_row import BinaryRow
 
@@ -32,7 +34,7 @@ from pypaimon.table.row.binary_row import BinaryRow
 class DataWriter(ABC):
     """Base class for data writers that handle PyArrow tables directly."""
 
-    def __init__(self, table, partition: Tuple, bucket: int):
+    def __init__(self, table, partition: Tuple, bucket: int, max_seq_number: 
int):
         from pypaimon.table.file_store_table import FileStoreTable
 
         self.table: FileStoreTable = table
@@ -50,6 +52,7 @@ class DataWriter(ABC):
                                        if self.bucket != 
BucketMode.POSTPONE_BUCKET.value
                                        else CoreOptions.FILE_FORMAT_AVRO)
         self.compression = options.get(CoreOptions.FILE_COMPRESSION, "zstd")
+        self.sequence_generator = SequenceGenerator(max_seq_number)
 
         self.pending_data: Optional[pa.RecordBatch] = None
         self.committed_files: List[DataFileMeta] = []
@@ -89,7 +92,7 @@ class DataWriter(ABC):
 
         current_size = self.pending_data.get_total_buffer_size()
         if current_size > self.target_file_size:
-            split_row = _find_optimal_split_point(self.pending_data, 
self.target_file_size)
+            split_row = self._find_optimal_split_point(self.pending_data, 
self.target_file_size)
             if split_row > 0:
                 data_to_write = self.pending_data.slice(0, split_row)
                 remaining_data = self.pending_data.slice(split_row)
@@ -101,7 +104,7 @@ class DataWriter(ABC):
     def _write_data_to_file(self, data: pa.RecordBatch):
         if data.num_rows == 0:
             return
-        file_name = f"data-{uuid.uuid4()}.{self.file_format}"
+        file_name = f"data-{uuid.uuid4()}-0.{self.file_format}"
         file_path = self._generate_file_path(file_name)
         if self.file_format == CoreOptions.FILE_FORMAT_PARQUET:
             self.file_io.write_parquet(file_path, data, 
compression=self.compression)
@@ -112,24 +115,55 @@ class DataWriter(ABC):
         else:
             raise ValueError(f"Unsupported file format: {self.file_format}")
 
+        # min key & max key
         key_columns_batch = data.select(self.trimmed_primary_key)
         min_key_row_batch = key_columns_batch.slice(0, 1)
-        min_key_data = [col.to_pylist()[0] for col in 
min_key_row_batch.columns]
         max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows 
- 1, 1)
-        max_key_data = [col.to_pylist()[0] for col in 
max_key_row_batch.columns]
+        min_key = [col.to_pylist()[0] for col in min_key_row_batch.columns]
+        max_key = [col.to_pylist()[0] for col in max_key_row_batch.columns]
+
+        # key stats & value stats
+        column_stats = {
+            field.name: self._get_column_stats(data, field.name)
+            for field in self.table.table_schema.fields
+        }
+        all_fields = self.table.table_schema.fields
+        min_value_stats = [column_stats[field.name]['min_value'] for field in 
all_fields]
+        max_value_stats = [column_stats[field.name]['max_value'] for field in 
all_fields]
+        value_null_counts = [column_stats[field.name]['null_count'] for field 
in all_fields]
+        key_fields = self.trimmed_primary_key_fields
+        min_key_stats = [column_stats[field.name]['min_value'] for field in 
key_fields]
+        max_key_stats = [column_stats[field.name]['max_value'] for field in 
key_fields]
+        key_null_counts = [column_stats[field.name]['null_count'] for field in 
key_fields]
+        if not all(count == 0 for count in key_null_counts):
+            raise RuntimeError("Primary key should not be null")
+
+        min_seq = self.sequence_generator.start
+        max_seq = self.sequence_generator.current
+        self.sequence_generator.start = self.sequence_generator.current
         self.committed_files.append(DataFileMeta(
             file_name=file_name,
             file_size=self.file_io.get_file_size(file_path),
             row_count=data.num_rows,
-            min_key=BinaryRow(min_key_data, self.trimmed_primary_key_fields),
-            max_key=BinaryRow(max_key_data, self.trimmed_primary_key_fields),
-            key_stats=None,  # TODO
-            value_stats=None,
-            min_sequence_number=0,
-            max_sequence_number=0,
-            schema_id=0,
+            min_key=BinaryRow(min_key, self.trimmed_primary_key_fields),
+            max_key=BinaryRow(max_key, self.trimmed_primary_key_fields),
+            key_stats=SimpleStats(
+                BinaryRow(min_key_stats, self.trimmed_primary_key_fields),
+                BinaryRow(max_key_stats, self.trimmed_primary_key_fields),
+                key_null_counts,
+            ),
+            value_stats=SimpleStats(
+                BinaryRow(min_value_stats, self.table.table_schema.fields),
+                BinaryRow(max_value_stats, self.table.table_schema.fields),
+                value_null_counts,
+            ),
+            min_sequence_number=min_seq,
+            max_sequence_number=max_seq,
+            schema_id=self.table.table_schema.id,
             level=0,
-            extra_files=None,
+            extra_files=[],
+            creation_time=datetime.now(),
+            delete_row_count=0,
             file_path=str(file_path),
         ))
 
@@ -146,24 +180,52 @@ class DataWriter(ABC):
 
         return path_builder
 
-
-def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> int:
-    total_rows = data.num_rows
-    if total_rows <= 1:
-        return 0
-
-    left, right = 1, total_rows
-    best_split = 0
-
-    while left <= right:
-        mid = (left + right) // 2
-        slice_data = data.slice(0, mid)
-        slice_size = slice_data.get_total_buffer_size()
-
-        if slice_size <= target_size:
-            best_split = mid
-            left = mid + 1
-        else:
-            right = mid - 1
-
-    return best_split
+    @staticmethod
+    def _find_optimal_split_point(data: pa.RecordBatch, target_size: int) -> 
int:
+        total_rows = data.num_rows
+        if total_rows <= 1:
+            return 0
+
+        left, right = 1, total_rows
+        best_split = 0
+
+        while left <= right:
+            mid = (left + right) // 2
+            slice_data = data.slice(0, mid)
+            slice_size = slice_data.get_total_buffer_size()
+
+            if slice_size <= target_size:
+                best_split = mid
+                left = mid + 1
+            else:
+                right = mid - 1
+
+        return best_split
+
+    @staticmethod
+    def _get_column_stats(record_batch: pa.RecordBatch, column_name: str) -> 
dict:
+        column_array = record_batch.column(column_name)
+        if column_array.null_count == len(column_array):
+            return {
+                "min_value": None,
+                "max_value": None,
+                "null_count": column_array.null_count,
+            }
+        min_value = pc.min(column_array).as_py()
+        max_value = pc.max(column_array).as_py()
+        null_count = column_array.null_count
+        return {
+            "min_value": min_value,
+            "max_value": max_value,
+            "null_count": null_count,
+        }
+
+
+class SequenceGenerator:
+    def __init__(self, start: int = 0):
+        self.start = start
+        self.current = start
+
+    def next(self) -> int:
+        self.current += 1
+        return self.current
diff --git a/paimon-python/pypaimon/write/writer/key_value_data_writer.py 
b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
index dbf47496c9..99b11a9788 100644
--- a/paimon-python/pypaimon/write/writer/key_value_data_writer.py
+++ b/paimon-python/pypaimon/write/writer/key_value_data_writer.py
@@ -22,18 +22,6 @@ import pyarrow.compute as pc
 from pypaimon.write.writer.data_writer import DataWriter
 
 
-class SequenceGenerator:
-    def __init__(self, start: int = 0):
-        self.current = start
-
-    def next(self) -> int:
-        self.current += 1
-        return self.current
-
-
-sequence_generator = SequenceGenerator()
-
-
 class KeyValueDataWriter(DataWriter):
     """Data writer for primary key tables with system fields and sorting."""
 
@@ -55,11 +43,11 @@ class KeyValueDataWriter(DataWriter):
                 key_column = data.column(pk_key)
                 enhanced_table = enhanced_table.add_column(0, 
f'_KEY_{pk_key}', key_column)
 
-        sequence_column = pa.array([sequence_generator.next() for _ in 
range(num_rows)], type=pa.int64())
+        sequence_column = pa.array([self.sequence_generator.next() for _ in 
range(num_rows)], type=pa.int64())
         enhanced_table = 
enhanced_table.add_column(len(self.trimmed_primary_key), '_SEQUENCE_NUMBER', 
sequence_column)
 
         # TODO: support real row kind here
-        value_kind_column = pa.repeat(0, num_rows)
+        value_kind_column = pa.array([0] * num_rows, type=pa.int32())
         enhanced_table = 
enhanced_table.add_column(len(self.trimmed_primary_key) + 1, '_VALUE_KIND',
                                                    value_kind_column)
 
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index 507844685c..a00a2ac487 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -28,7 +28,8 @@ install_requires = [
     'ossfs==2023.12.0',
     'pyarrow==16.0.0',
     'polars==1.32.0',
-    'fastavro==1.11.1'
+    'fastavro==1.11.1',
+    'zstandard==0.24.0'
 ]
 
 long_description = "See Apache Paimon Python API \

Reply via email to