This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 54e3332e2a [python] add error handling to abort pending commits  and 
serialization support for ray sink (#7022)
54e3332e2a is described below

commit 54e3332e2aa1971a0890653e6bf876374aea5716
Author: XiaoHongbo <[email protected]>
AuthorDate: Tue Jan 20 15:37:13 2026 +0800

    [python] add error handling to abort pending commits  and serialization 
support for ray sink (#7022)
---
 docs/content/program-api/python-api.md             |  16 +-
 paimon-python/dev/.rat-excludes                    |   1 +
 .../sample/data/__init__.py}                       |   3 -
 paimon-python/pypaimon/sample/data/data.jsonl      | 100 ++++++
 .../sample/rest_catalog_ray_sink_sample.py         | 153 +++++++++
 paimon-python/pypaimon/tests/ray_data_test.py      |   2 +-
 paimon-python/pypaimon/tests/ray_sink_test.py      | 343 +++++++++++++++++++++
 paimon-python/pypaimon/write/ray_datasink.py       | 214 +++++++++----
 paimon-python/pypaimon/write/table_write.py        |  31 +-
 pom.xml                                            |   1 +
 10 files changed, 798 insertions(+), 66 deletions(-)

diff --git a/docs/content/program-api/python-api.md 
b/docs/content/program-api/python-api.md
index aa7773e049..606937585c 100644
--- a/docs/content/program-api/python-api.md
+++ b/docs/content/program-api/python-api.md
@@ -181,7 +181,7 @@ write_builder = table.new_batch_write_builder()
 table_write = write_builder.new_write()
 table_commit = write_builder.new_commit()
 
-# 2. Write data. Support 3 methods:
+# 2. Write data. Support 4 methods:
 # 2.1 Write pandas.DataFrame
 dataframe = ...
 table_write.write_pandas(dataframe)
@@ -194,7 +194,19 @@ table_write.write_arrow(pa_table)
 record_batch = ...
 table_write.write_arrow_batch(record_batch)
 
-# 3. Commit data
+# 2.4 Write Ray Dataset (requires ray to be installed)
+import ray
+ray_dataset = ray.data.read_json("/path/to/data.jsonl")
+table_write.write_ray(ray_dataset, overwrite=False, concurrency=2)
+# Parameters:
+#   - dataset: Ray Dataset to write
+#   - overwrite: Whether to overwrite existing data (default: False)
+#   - concurrency: Optional max number of concurrent Ray tasks
+#   - ray_remote_args: Optional kwargs passed to ray.remote() (e.g., 
{"num_cpus": 2})
+# Note: write_ray() handles commit internally through Ray Datasink API.
+#       Skip steps 3-4 if using write_ray() - just close the writer.
+
+# 3. Commit data (required for write_pandas/write_arrow/write_arrow_batch only)
 commit_messages = table_write.prepare_commit()
 table_commit.commit(commit_messages)
 
diff --git a/paimon-python/dev/.rat-excludes b/paimon-python/dev/.rat-excludes
index 576371afae..421839c0f1 100644
--- a/paimon-python/dev/.rat-excludes
+++ b/paimon-python/dev/.rat-excludes
@@ -18,3 +18,4 @@
 
 .gitignore
 rat-results.txt
+.*\.jsonl$
diff --git a/paimon-python/dev/.rat-excludes 
b/paimon-python/pypaimon/sample/data/__init__.py
similarity index 97%
copy from paimon-python/dev/.rat-excludes
copy to paimon-python/pypaimon/sample/data/__init__.py
index 576371afae..65b48d4d79 100644
--- a/paimon-python/dev/.rat-excludes
+++ b/paimon-python/pypaimon/sample/data/__init__.py
@@ -15,6 +15,3 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-
-.gitignore
-rat-results.txt
diff --git a/paimon-python/pypaimon/sample/data/data.jsonl 
b/paimon-python/pypaimon/sample/data/data.jsonl
new file mode 100644
index 0000000000..f33de19c57
--- /dev/null
+++ b/paimon-python/pypaimon/sample/data/data.jsonl
@@ -0,0 +1,100 @@
+{"id": 1, "name": "Item_1", "category": "B", "value": 12.8, "score": 55, 
"timestamp": "2024-01-02T10:00:00Z"}
+{"id": 2, "name": "Item_2", "category": "C", "value": 15.1, "score": 60, 
"timestamp": "2024-01-03T10:00:00Z"}
+{"id": 3, "name": "Item_3", "category": "A", "value": 17.4, "score": 65, 
"timestamp": "2024-01-04T10:00:00Z"}
+{"id": 4, "name": "Item_4", "category": "B", "value": 19.7, "score": 70, 
"timestamp": "2024-01-05T10:00:00Z"}
+{"id": 5, "name": "Item_5", "category": "C", "value": 22.0, "score": 75, 
"timestamp": "2024-01-06T10:00:00Z"}
+{"id": 6, "name": "Item_6", "category": "A", "value": 24.299999999999997, 
"score": 80, "timestamp": "2024-01-07T10:00:00Z"}
+{"id": 7, "name": "Item_7", "category": "B", "value": 26.599999999999998, 
"score": 85, "timestamp": "2024-01-08T10:00:00Z"}
+{"id": 8, "name": "Item_8", "category": "C", "value": 28.9, "score": 90, 
"timestamp": "2024-01-09T10:00:00Z"}
+{"id": 9, "name": "Item_9", "category": "A", "value": 31.2, "score": 95, 
"timestamp": "2024-01-10T10:00:00Z"}
+{"id": 10, "name": "Item_10", "category": "B", "value": 33.5, "score": 100, 
"timestamp": "2024-01-11T10:00:00Z"}
+{"id": 11, "name": "Item_11", "category": "C", "value": 35.8, "score": 105, 
"timestamp": "2024-01-12T10:00:00Z"}
+{"id": 12, "name": "Item_12", "category": "A", "value": 38.099999999999994, 
"score": 110, "timestamp": "2024-01-13T10:00:00Z"}
+{"id": 13, "name": "Item_13", "category": "B", "value": 40.4, "score": 115, 
"timestamp": "2024-01-14T10:00:00Z"}
+{"id": 14, "name": "Item_14", "category": "C", "value": 42.699999999999996, 
"score": 120, "timestamp": "2024-01-15T10:00:00Z"}
+{"id": 15, "name": "Item_15", "category": "A", "value": 45.0, "score": 125, 
"timestamp": "2024-01-16T10:00:00Z"}
+{"id": 16, "name": "Item_16", "category": "B", "value": 47.3, "score": 130, 
"timestamp": "2024-01-17T10:00:00Z"}
+{"id": 17, "name": "Item_17", "category": "C", "value": 49.599999999999994, 
"score": 135, "timestamp": "2024-01-18T10:00:00Z"}
+{"id": 18, "name": "Item_18", "category": "A", "value": 51.9, "score": 140, 
"timestamp": "2024-01-19T10:00:00Z"}
+{"id": 19, "name": "Item_19", "category": "B", "value": 54.199999999999996, 
"score": 145, "timestamp": "2024-01-20T10:00:00Z"}
+{"id": 20, "name": "Item_20", "category": "C", "value": 56.5, "score": 150, 
"timestamp": "2024-01-21T10:00:00Z"}
+{"id": 21, "name": "Item_21", "category": "A", "value": 58.8, "score": 155, 
"timestamp": "2024-01-22T10:00:00Z"}
+{"id": 22, "name": "Item_22", "category": "B", "value": 61.099999999999994, 
"score": 160, "timestamp": "2024-01-23T10:00:00Z"}
+{"id": 23, "name": "Item_23", "category": "C", "value": 63.4, "score": 165, 
"timestamp": "2024-01-24T10:00:00Z"}
+{"id": 24, "name": "Item_24", "category": "A", "value": 65.69999999999999, 
"score": 170, "timestamp": "2024-01-25T10:00:00Z"}
+{"id": 25, "name": "Item_25", "category": "B", "value": 68.0, "score": 175, 
"timestamp": "2024-01-26T10:00:00Z"}
+{"id": 26, "name": "Item_26", "category": "C", "value": 70.3, "score": 180, 
"timestamp": "2024-01-27T10:00:00Z"}
+{"id": 27, "name": "Item_27", "category": "A", "value": 72.6, "score": 185, 
"timestamp": "2024-01-28T10:00:00Z"}
+{"id": 28, "name": "Item_28", "category": "B", "value": 74.89999999999999, 
"score": 190, "timestamp": "2024-01-01T10:00:00Z"}
+{"id": 29, "name": "Item_29", "category": "C", "value": 77.19999999999999, 
"score": 195, "timestamp": "2024-01-02T10:00:00Z"}
+{"id": 30, "name": "Item_30", "category": "A", "value": 79.5, "score": 200, 
"timestamp": "2024-01-03T10:00:00Z"}
+{"id": 31, "name": "Item_31", "category": "B", "value": 81.8, "score": 205, 
"timestamp": "2024-01-04T10:00:00Z"}
+{"id": 32, "name": "Item_32", "category": "C", "value": 84.1, "score": 210, 
"timestamp": "2024-01-05T10:00:00Z"}
+{"id": 33, "name": "Item_33", "category": "A", "value": 86.39999999999999, 
"score": 215, "timestamp": "2024-01-06T10:00:00Z"}
+{"id": 34, "name": "Item_34", "category": "B", "value": 88.69999999999999, 
"score": 220, "timestamp": "2024-01-07T10:00:00Z"}
+{"id": 35, "name": "Item_35", "category": "C", "value": 91.0, "score": 225, 
"timestamp": "2024-01-08T10:00:00Z"}
+{"id": 36, "name": "Item_36", "category": "A", "value": 93.3, "score": 230, 
"timestamp": "2024-01-09T10:00:00Z"}
+{"id": 37, "name": "Item_37", "category": "B", "value": 95.6, "score": 235, 
"timestamp": "2024-01-10T10:00:00Z"}
+{"id": 38, "name": "Item_38", "category": "C", "value": 97.89999999999999, 
"score": 240, "timestamp": "2024-01-11T10:00:00Z"}
+{"id": 39, "name": "Item_39", "category": "A", "value": 100.19999999999999, 
"score": 245, "timestamp": "2024-01-12T10:00:00Z"}
+{"id": 40, "name": "Item_40", "category": "B", "value": 102.5, "score": 250, 
"timestamp": "2024-01-13T10:00:00Z"}
+{"id": 41, "name": "Item_41", "category": "C", "value": 104.8, "score": 255, 
"timestamp": "2024-01-14T10:00:00Z"}
+{"id": 42, "name": "Item_42", "category": "A", "value": 107.1, "score": 260, 
"timestamp": "2024-01-15T10:00:00Z"}
+{"id": 43, "name": "Item_43", "category": "B", "value": 109.39999999999999, 
"score": 265, "timestamp": "2024-01-16T10:00:00Z"}
+{"id": 44, "name": "Item_44", "category": "C", "value": 111.69999999999999, 
"score": 270, "timestamp": "2024-01-17T10:00:00Z"}
+{"id": 45, "name": "Item_45", "category": "A", "value": 113.99999999999999, 
"score": 275, "timestamp": "2024-01-18T10:00:00Z"}
+{"id": 46, "name": "Item_46", "category": "B", "value": 116.3, "score": 280, 
"timestamp": "2024-01-19T10:00:00Z"}
+{"id": 47, "name": "Item_47", "category": "C", "value": 118.6, "score": 285, 
"timestamp": "2024-01-20T10:00:00Z"}
+{"id": 48, "name": "Item_48", "category": "A", "value": 120.89999999999999, 
"score": 290, "timestamp": "2024-01-21T10:00:00Z"}
+{"id": 49, "name": "Item_49", "category": "B", "value": 123.19999999999999, 
"score": 295, "timestamp": "2024-01-22T10:00:00Z"}
+{"id": 50, "name": "Item_50", "category": "C", "value": 125.49999999999999, 
"score": 300, "timestamp": "2024-01-23T10:00:00Z"}
+{"id": 51, "name": "Item_51", "category": "A", "value": 127.8, "score": 305, 
"timestamp": "2024-01-24T10:00:00Z"}
+{"id": 52, "name": "Item_52", "category": "B", "value": 130.1, "score": 310, 
"timestamp": "2024-01-25T10:00:00Z"}
+{"id": 53, "name": "Item_53", "category": "C", "value": 132.39999999999998, 
"score": 315, "timestamp": "2024-01-26T10:00:00Z"}
+{"id": 54, "name": "Item_54", "category": "A", "value": 134.7, "score": 320, 
"timestamp": "2024-01-27T10:00:00Z"}
+{"id": 55, "name": "Item_55", "category": "B", "value": 137.0, "score": 325, 
"timestamp": "2024-01-28T10:00:00Z"}
+{"id": 56, "name": "Item_56", "category": "C", "value": 139.29999999999998, 
"score": 330, "timestamp": "2024-01-01T10:00:00Z"}
+{"id": 57, "name": "Item_57", "category": "A", "value": 141.6, "score": 335, 
"timestamp": "2024-01-02T10:00:00Z"}
+{"id": 58, "name": "Item_58", "category": "B", "value": 143.89999999999998, 
"score": 340, "timestamp": "2024-01-03T10:00:00Z"}
+{"id": 59, "name": "Item_59", "category": "C", "value": 146.2, "score": 345, 
"timestamp": "2024-01-04T10:00:00Z"}
+{"id": 60, "name": "Item_60", "category": "A", "value": 148.5, "score": 350, 
"timestamp": "2024-01-05T10:00:00Z"}
+{"id": 61, "name": "Item_61", "category": "B", "value": 150.79999999999998, 
"score": 355, "timestamp": "2024-01-06T10:00:00Z"}
+{"id": 62, "name": "Item_62", "category": "C", "value": 153.1, "score": 360, 
"timestamp": "2024-01-07T10:00:00Z"}
+{"id": 63, "name": "Item_63", "category": "A", "value": 155.39999999999998, 
"score": 365, "timestamp": "2024-01-08T10:00:00Z"}
+{"id": 64, "name": "Item_64", "category": "B", "value": 157.7, "score": 370, 
"timestamp": "2024-01-09T10:00:00Z"}
+{"id": 65, "name": "Item_65", "category": "C", "value": 160.0, "score": 375, 
"timestamp": "2024-01-10T10:00:00Z"}
+{"id": 66, "name": "Item_66", "category": "A", "value": 162.29999999999998, 
"score": 380, "timestamp": "2024-01-11T10:00:00Z"}
+{"id": 67, "name": "Item_67", "category": "B", "value": 164.6, "score": 385, 
"timestamp": "2024-01-12T10:00:00Z"}
+{"id": 68, "name": "Item_68", "category": "C", "value": 166.89999999999998, 
"score": 390, "timestamp": "2024-01-13T10:00:00Z"}
+{"id": 69, "name": "Item_69", "category": "A", "value": 169.2, "score": 395, 
"timestamp": "2024-01-14T10:00:00Z"}
+{"id": 70, "name": "Item_70", "category": "B", "value": 171.5, "score": 400, 
"timestamp": "2024-01-15T10:00:00Z"}
+{"id": 71, "name": "Item_71", "category": "C", "value": 173.79999999999998, 
"score": 405, "timestamp": "2024-01-16T10:00:00Z"}
+{"id": 72, "name": "Item_72", "category": "A", "value": 176.1, "score": 410, 
"timestamp": "2024-01-17T10:00:00Z"}
+{"id": 73, "name": "Item_73", "category": "B", "value": 178.39999999999998, 
"score": 415, "timestamp": "2024-01-18T10:00:00Z"}
+{"id": 74, "name": "Item_74", "category": "C", "value": 180.7, "score": 420, 
"timestamp": "2024-01-19T10:00:00Z"}
+{"id": 75, "name": "Item_75", "category": "A", "value": 183.0, "score": 425, 
"timestamp": "2024-01-20T10:00:00Z"}
+{"id": 76, "name": "Item_76", "category": "B", "value": 185.29999999999998, 
"score": 430, "timestamp": "2024-01-21T10:00:00Z"}
+{"id": 77, "name": "Item_77", "category": "C", "value": 187.6, "score": 435, 
"timestamp": "2024-01-22T10:00:00Z"}
+{"id": 78, "name": "Item_78", "category": "A", "value": 189.89999999999998, 
"score": 440, "timestamp": "2024-01-23T10:00:00Z"}
+{"id": 79, "name": "Item_79", "category": "B", "value": 192.2, "score": 445, 
"timestamp": "2024-01-24T10:00:00Z"}
+{"id": 80, "name": "Item_80", "category": "C", "value": 194.5, "score": 450, 
"timestamp": "2024-01-25T10:00:00Z"}
+{"id": 81, "name": "Item_81", "category": "A", "value": 196.79999999999998, 
"score": 455, "timestamp": "2024-01-26T10:00:00Z"}
+{"id": 82, "name": "Item_82", "category": "B", "value": 199.1, "score": 460, 
"timestamp": "2024-01-27T10:00:00Z"}
+{"id": 83, "name": "Item_83", "category": "C", "value": 201.39999999999998, 
"score": 465, "timestamp": "2024-01-28T10:00:00Z"}
+{"id": 84, "name": "Item_84", "category": "A", "value": 203.7, "score": 470, 
"timestamp": "2024-01-01T10:00:00Z"}
+{"id": 85, "name": "Item_85", "category": "B", "value": 205.99999999999997, 
"score": 475, "timestamp": "2024-01-02T10:00:00Z"}
+{"id": 86, "name": "Item_86", "category": "C", "value": 208.29999999999998, 
"score": 480, "timestamp": "2024-01-03T10:00:00Z"}
+{"id": 87, "name": "Item_87", "category": "A", "value": 210.6, "score": 485, 
"timestamp": "2024-01-04T10:00:00Z"}
+{"id": 88, "name": "Item_88", "category": "B", "value": 212.89999999999998, 
"score": 490, "timestamp": "2024-01-05T10:00:00Z"}
+{"id": 89, "name": "Item_89", "category": "C", "value": 215.2, "score": 495, 
"timestamp": "2024-01-06T10:00:00Z"}
+{"id": 90, "name": "Item_90", "category": "A", "value": 217.49999999999997, 
"score": 500, "timestamp": "2024-01-07T10:00:00Z"}
+{"id": 91, "name": "Item_91", "category": "B", "value": 219.79999999999998, 
"score": 505, "timestamp": "2024-01-08T10:00:00Z"}
+{"id": 92, "name": "Item_92", "category": "C", "value": 222.1, "score": 510, 
"timestamp": "2024-01-09T10:00:00Z"}
+{"id": 93, "name": "Item_93", "category": "A", "value": 224.39999999999998, 
"score": 515, "timestamp": "2024-01-10T10:00:00Z"}
+{"id": 94, "name": "Item_94", "category": "B", "value": 226.7, "score": 520, 
"timestamp": "2024-01-11T10:00:00Z"}
+{"id": 95, "name": "Item_95", "category": "C", "value": 228.99999999999997, 
"score": 525, "timestamp": "2024-01-12T10:00:00Z"}
+{"id": 96, "name": "Item_96", "category": "A", "value": 231.29999999999998, 
"score": 530, "timestamp": "2024-01-13T10:00:00Z"}
+{"id": 97, "name": "Item_97", "category": "B", "value": 233.6, "score": 535, 
"timestamp": "2024-01-14T10:00:00Z"}
+{"id": 98, "name": "Item_98", "category": "C", "value": 235.89999999999998, 
"score": 540, "timestamp": "2024-01-15T10:00:00Z"}
+{"id": 99, "name": "Item_99", "category": "A", "value": 238.2, "score": 545, 
"timestamp": "2024-01-16T10:00:00Z"}
+{"id": 100, "name": "Item_100", "category": "B", "value": 240.49999999999997, 
"score": 550, "timestamp": "2024-01-17T10:00:00Z"}
diff --git a/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py 
b/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py
new file mode 100644
index 0000000000..83264854d7
--- /dev/null
+++ b/paimon-python/pypaimon/sample/rest_catalog_ray_sink_sample.py
@@ -0,0 +1,153 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+"""
+Example: REST Catalog + Ray Sink with Local File + Mock Server
+
+Demonstrates reading JSON data from local file using Ray Data and writing to 
Paimon table
+using Ray Sink (write_ray) with a mock REST catalog server.
+"""
+
+import os
+import tempfile
+import uuid
+
+import pyarrow as pa
+import ray
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.common.options.core_options import CoreOptions
+from pypaimon.schema.data_types import PyarrowFieldParser
+from pypaimon.tests.rest.rest_server import RESTCatalogServer
+from pypaimon.api.api_response import ConfigResponse
+from pypaimon.api.auth import BearTokenAuthProvider
+
+
+def _get_sample_data_path(filename: str) -> str:
+    sample_dir = os.path.dirname(os.path.abspath(__file__))
+    return os.path.join(sample_dir, 'data', filename)
+
+
+def main():
+    ray.init(ignore_reinit_error=True, num_cpus=2)
+    
+    temp_dir = tempfile.mkdtemp()
+    token = str(uuid.uuid4())
+    server = RESTCatalogServer(
+        data_path=temp_dir,
+        auth_provider=BearTokenAuthProvider(token),
+        config=ConfigResponse(defaults={"prefix": "mock-test"}),
+        warehouse="warehouse"
+    )
+    server.start()
+    print(f"REST server started at: {server.get_url()}")
+    
+    json_file_path = _get_sample_data_path('data.jsonl')
+    
+    table_write = None
+    try:
+        catalog = CatalogFactory.create({
+            'metastore': 'rest',
+            'uri': f"http://localhost:{server.port}";,
+            'warehouse': "warehouse",
+            'token.provider': 'bear',
+            'token': token,
+        })
+        catalog.create_database("default", ignore_if_exists=True)
+        
+        schema = Schema.from_pyarrow_schema(pa.schema([
+            ('id', pa.int64()),
+            ('name', pa.string()),
+            ('category', pa.string()),
+            ('value', pa.float64()),
+            ('score', pa.int64()),
+            ('timestamp', pa.timestamp('s')),
+        ]), primary_keys=['id'], options={
+            CoreOptions.BUCKET.key(): '4'
+        })
+        
+        table_name = 'default.oss_json_import_table'
+        catalog.create_table(table_name, schema, ignore_if_exists=True)
+        table = catalog.get_table(table_name)
+        
+        print(f"Reading JSON from local file: {json_file_path}")
+        ray_dataset = ray.data.read_json(
+            json_file_path,
+            concurrency=2,
+        )
+        
+        print(f"Ray Dataset: {ray_dataset.count()} rows")
+        print(f"Schema: {ray_dataset.schema()}")
+        
+        table_pa_schema = 
PyarrowFieldParser.from_paimon_schema(table.table_schema.fields)
+        
+        def cast_batch_to_table_schema(batch: pa.RecordBatch) -> pa.Table:
+            arrays = []
+            for field in table_pa_schema:
+                col_name = field.name
+                col_array = batch.column(col_name)
+                if isinstance(col_array, pa.ChunkedArray):
+                    col_array = col_array.combine_chunks()
+                if col_array.type != field.type:
+                    col_array = col_array.cast(field.type)
+                arrays.append(col_array)
+            record_batch = pa.RecordBatch.from_arrays(arrays, 
schema=table_pa_schema)
+            return pa.Table.from_batches([record_batch])
+        
+        ray_dataset = ray_dataset.map_batches(
+            cast_batch_to_table_schema,
+            batch_format="pyarrow",
+        )
+        
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+        
+        table_write.write_ray(
+            ray_dataset,
+            overwrite=False,
+            concurrency=2,
+            ray_remote_args={"num_cpus": 1}
+        )
+        
+        # write_ray() has already committed the data, just close the writer
+        table_write.close()
+        table_write = None
+        
+        print(f"Successfully wrote {ray_dataset.count()} rows to table")
+        
+        read_builder = table.new_read_builder()
+        table_read = read_builder.new_read()
+        table_scan = read_builder.new_scan()
+        splits = table_scan.plan().splits()
+        
+        result_df = table_read.to_pandas(splits)
+        print(f"Read back {len(result_df)} rows from table")
+        print(result_df.head())
+        
+    finally:
+        if table_write is not None:
+            try:
+                table_write.close()
+            except Exception:
+                pass
+        server.shutdown()
+        if ray.is_initialized():
+            ray.shutdown()
+
+
+if __name__ == '__main__':
+    main()
diff --git a/paimon-python/pypaimon/tests/ray_data_test.py 
b/paimon-python/pypaimon/tests/ray_data_test.py
index e931a4c7dc..00995d6976 100644
--- a/paimon-python/pypaimon/tests/ray_data_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_test.py
@@ -155,7 +155,7 @@ class RayDataTest(unittest.TestCase):
         ds = from_arrow(test_data)
         write_builder = table.new_batch_write_builder()
         writer = write_builder.new_write()
-        writer.write_raydata(ds, parallelism=2)
+        writer.write_ray(ds, concurrency=2)
         # Read using Ray Data
         read_builder = table.new_read_builder()
         table_read = read_builder.new_read()
diff --git a/paimon-python/pypaimon/tests/ray_sink_test.py 
b/paimon-python/pypaimon/tests/ray_sink_test.py
new file mode 100644
index 0000000000..bc1ff3ef9f
--- /dev/null
+++ b/paimon-python/pypaimon/tests/ray_sink_test.py
@@ -0,0 +1,343 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import tempfile
+import unittest
+from unittest.mock import Mock, patch
+
+import pyarrow as pa
+from ray.data._internal.execution.interfaces import TaskContext
+
+from pypaimon import CatalogFactory, Schema
+from pypaimon.write.ray_datasink import PaimonDatasink
+from pypaimon.write.commit_message import CommitMessage
+
+
+class RaySinkTest(unittest.TestCase):
+    def setUp(self):
+        self.temp_dir = tempfile.mkdtemp()
+        self.warehouse_path = os.path.join(self.temp_dir, "warehouse")
+        os.makedirs(self.warehouse_path, exist_ok=True)
+
+        catalog_options = {
+            "warehouse": self.warehouse_path
+        }
+        self.catalog = CatalogFactory.create(catalog_options)
+        self.catalog.create_database("test_db", ignore_if_exists=True)
+
+        pa_schema = pa.schema([
+            ('id', pa.int64()),
+            ('name', pa.string()),
+            ('value', pa.float64())
+        ])
+
+        schema = Schema.from_pyarrow_schema(
+            pa_schema=pa_schema,
+            partition_keys=None,
+            primary_keys=['id'],
+            options={'bucket': '2'},  # Use fixed bucket mode for testing
+            comment='test table'
+        )
+
+        self.table_identifier = "test_db.test_table"
+        self.catalog.create_table(self.table_identifier, schema, 
ignore_if_exists=False)
+        self.table = self.catalog.get_table(self.table_identifier)
+
+    def tearDown(self):
+        import shutil
+        if os.path.exists(self.temp_dir):
+            shutil.rmtree(self.temp_dir)
+
+    def test_init_and_serialization(self):
+        """Test initialization, serialization, and table name."""
+        datasink = PaimonDatasink(self.table, overwrite=False)
+        self.assertEqual(datasink.table, self.table)
+        self.assertFalse(datasink.overwrite)
+        self.assertIsNone(datasink._writer_builder)
+        self.assertEqual(datasink._table_name, "test_db.test_table")
+
+        datasink_overwrite = PaimonDatasink(self.table, overwrite=True)
+        self.assertTrue(datasink_overwrite.overwrite)
+
+        # Test serialization
+        datasink._writer_builder = Mock()
+        state = datasink.__getstate__()
+        self.assertIn('table', state)
+        self.assertIn('overwrite', state)
+        self.assertIn('_writer_builder', state)
+
+        new_datasink = PaimonDatasink.__new__(PaimonDatasink)
+        new_datasink.__setstate__(state)
+        self.assertEqual(new_datasink.table, self.table)
+        self.assertFalse(new_datasink.overwrite)
+
+    def test_table_and_writer_builder_serializable(self):
+        import pickle
+        try:
+            pickled_table = pickle.dumps(self.table)
+            unpickled_table = pickle.loads(pickled_table)
+            self.assertIsNotNone(unpickled_table)
+            builder = unpickled_table.new_batch_write_builder()
+            self.assertIsNotNone(builder)
+        except Exception as e:
+            self.fail(f"Table object is not serializable: {e}")
+        
+        writer_builder = self.table.new_batch_write_builder()
+        try:
+            pickled_builder = pickle.dumps(writer_builder)
+            unpickled_builder = pickle.loads(pickled_builder)
+            self.assertIsNotNone(unpickled_builder)
+            table_write = unpickled_builder.new_write()
+            self.assertIsNotNone(table_write)
+            table_write.close()
+        except Exception as e:
+            self.fail(f"WriterBuilder is not serializable: {e}")
+        
+        overwrite_builder = self.table.new_batch_write_builder().overwrite()
+        try:
+            pickled_overwrite = pickle.dumps(overwrite_builder)
+            unpickled_overwrite = pickle.loads(pickled_overwrite)
+            self.assertIsNotNone(unpickled_overwrite)
+            # static_partition is a dict, empty dict {} means overwrite all 
partitions
+            self.assertIsNotNone(unpickled_overwrite.static_partition)
+            self.assertIsInstance(unpickled_overwrite.static_partition, dict)
+        except Exception as e:
+            self.fail(f"Overwrite WriterBuilder is not serializable: {e}")
+
+    def test_on_write_start(self):
+        """Test on_write_start with normal and overwrite modes."""
+        datasink = PaimonDatasink(self.table, overwrite=False)
+        datasink.on_write_start()
+        self.assertIsNotNone(datasink._writer_builder)
+        self.assertFalse(datasink._writer_builder.static_partition)
+
+        datasink_overwrite = PaimonDatasink(self.table, overwrite=True)
+        datasink_overwrite.on_write_start()
+        
self.assertIsNotNone(datasink_overwrite._writer_builder.static_partition)
+
+    def test_write(self):
+        """Test write method: empty blocks, multiple blocks, error handling, 
and resource cleanup."""
+        datasink = PaimonDatasink(self.table, overwrite=False)
+        datasink.on_write_start()
+        ctx = Mock(spec=TaskContext)
+
+        # Test empty block
+        empty_table = pa.table({
+            'id': pa.array([], type=pa.int64()),
+            'name': pa.array([], type=pa.string()),
+            'value': pa.array([], type=pa.float64())
+        })
+        result = datasink.write([empty_table], ctx)
+        self.assertEqual(result, [])
+
+        # Test single and multiple blocks
+        single_block = pa.table({
+            'id': [1, 2, 3],
+            'name': ['Alice', 'Bob', 'Charlie'],
+            'value': [1.1, 2.2, 3.3]
+        })
+        result = datasink.write([single_block], ctx)
+        self.assertIsInstance(result, list)
+        if result:
+            self.assertTrue(all(isinstance(msg, CommitMessage) for msg in 
result))
+
+        block1 = pa.table({
+            'id': [4, 5],
+            'name': ['David', 'Eve'],
+            'value': [4.4, 5.5]
+        })
+        block2 = pa.table({
+            'id': [6, 7],
+            'name': ['Frank', 'Grace'],
+            'value': [6.6, 7.7]
+        })
+        result = datasink.write([block1, block2], ctx)
+        self.assertIsInstance(result, list)
+        if result:
+            self.assertTrue(all(isinstance(msg, CommitMessage) for msg in 
result))
+
+        # Test that write creates WriteBuilder on worker (not using driver's 
builder)
+        with patch.object(self.table, 'new_batch_write_builder') as 
mock_builder:
+            mock_write_builder = Mock()
+            mock_write_builder.overwrite.return_value = mock_write_builder
+            mock_write = Mock()
+            mock_write.prepare_commit.return_value = []
+            mock_write_builder.new_write.return_value = mock_write
+            mock_builder.return_value = mock_write_builder
+
+            data_table = pa.table({
+                'id': [1],
+                'name': ['Alice'],
+                'value': [1.1]
+            })
+            datasink.write([data_table], ctx)
+            mock_builder.assert_called_once()
+
+        invalid_table = pa.table({
+            'wrong_column': [1, 2, 3]
+        })
+        with self.assertRaises(Exception):
+            datasink.write([invalid_table], ctx)
+
+        with patch.object(self.table, 'new_batch_write_builder') as 
mock_builder:
+            mock_write_builder = Mock()
+            mock_write_builder.overwrite.return_value = mock_write_builder
+            mock_write = Mock()
+            mock_write.write_arrow.side_effect = Exception("Write error")
+            mock_write_builder.new_write.return_value = mock_write
+            mock_builder.return_value = mock_write_builder
+
+            data_table = pa.table({
+                'id': [1],
+                'name': ['Alice'],
+                'value': [1.1]
+            })
+            with self.assertRaises(Exception):
+                datasink.write([data_table], ctx)
+            mock_write.close.assert_called_once()
+
+    def test_on_write_complete(self):
+        from ray.data.datasource.datasink import WriteResult
+
+        # Test empty messages
+        datasink = PaimonDatasink(self.table, overwrite=False)
+        datasink.on_write_start()
+        write_result = WriteResult(
+            num_rows=0,
+            size_bytes=0,
+            write_returns=[[], []]
+        )
+        datasink.on_write_complete(write_result)
+
+        # Test with messages and filtering empty messages
+        datasink = PaimonDatasink(self.table, overwrite=False)
+        datasink.on_write_start()
+        commit_msg1 = Mock(spec=CommitMessage)
+        commit_msg1.is_empty.return_value = False
+        commit_msg2 = Mock(spec=CommitMessage)
+        commit_msg2.is_empty.return_value = False
+        empty_msg = Mock(spec=CommitMessage)
+        empty_msg.is_empty.return_value = True
+
+        write_result = WriteResult(
+            num_rows=0,
+            size_bytes=0,
+            write_returns=[[commit_msg1], [commit_msg2], [empty_msg]]
+        )
+
+        mock_commit = Mock()
+        datasink._writer_builder.new_commit = Mock(return_value=mock_commit)
+        datasink.on_write_complete(write_result)
+
+        mock_commit.commit.assert_called_once()
+        commit_args = mock_commit.commit.call_args[0][0]
+        self.assertEqual(len(commit_args), 2)  # Empty message filtered out
+        mock_commit.close.assert_called_once()
+
+        # Test commit failure: abort should be called
+        datasink = PaimonDatasink(self.table, overwrite=False)
+        datasink.on_write_start()
+        commit_msg1 = Mock(spec=CommitMessage)
+        commit_msg1.is_empty.return_value = False
+        commit_msg2 = Mock(spec=CommitMessage)
+        commit_msg2.is_empty.return_value = False
+
+        write_result = WriteResult(
+            num_rows=0,
+            size_bytes=0,
+            write_returns=[[commit_msg1], [commit_msg2]]
+        )
+
+        mock_commit = Mock()
+        mock_commit.commit.side_effect = Exception("Commit failed")
+        datasink._writer_builder.new_commit = Mock(return_value=mock_commit)
+
+        with self.assertRaises(Exception):
+            datasink.on_write_complete(write_result)
+
+        mock_commit.abort.assert_called_once()
+        abort_args = mock_commit.abort.call_args[0][0]
+        self.assertEqual(len(abort_args), 2)
+        mock_commit.close.assert_called_once()
+
+        # Test table_commit creation failure
+        datasink = PaimonDatasink(self.table, overwrite=False)
+        datasink.on_write_start()
+        commit_msg1 = Mock(spec=CommitMessage)
+        commit_msg1.is_empty.return_value = False
+
+        write_result = WriteResult(
+            num_rows=0,
+            size_bytes=0,
+            write_returns=[[commit_msg1]]
+        )
+
+        mock_new_commit = Mock(side_effect=Exception("Failed to create 
table_commit"))
+        datasink._writer_builder.new_commit = mock_new_commit
+        with self.assertRaises(Exception):
+            datasink.on_write_complete(write_result)
+        self.assertEqual(len(datasink._pending_commit_messages), 1)
+
+    def test_on_write_failed(self):
+        # Test without pending messages (on_write_complete() never called)
+        datasink = PaimonDatasink(self.table, overwrite=False)
+        datasink.on_write_start()
+        self.assertEqual(datasink._pending_commit_messages, [])
+        error = Exception("Write job failed")
+        datasink.on_write_failed(error)  # Should not raise exception
+
+        # Test with pending messages (on_write_complete() was called but 
failed)
+        datasink = PaimonDatasink(self.table, overwrite=False)
+        datasink.on_write_start()
+        commit_msg1 = Mock(spec=CommitMessage)
+        commit_msg2 = Mock(spec=CommitMessage)
+        datasink._pending_commit_messages = [commit_msg1, commit_msg2]
+
+        mock_commit = Mock()
+        datasink._writer_builder.new_commit = Mock(return_value=mock_commit)
+        error = Exception("Write job failed")
+        datasink.on_write_failed(error)
+
+        mock_commit.abort.assert_called_once()
+        abort_args = mock_commit.abort.call_args[0][0]
+        self.assertEqual(len(abort_args), 2)
+        self.assertEqual(abort_args[0], commit_msg1)
+        self.assertEqual(abort_args[1], commit_msg2)
+        mock_commit.close.assert_called_once()
+        self.assertEqual(datasink._pending_commit_messages, [])
+
+        # Test abort failure handling (should not raise exception)
+        datasink = PaimonDatasink(self.table, overwrite=False)
+        datasink.on_write_start()
+        commit_msg1 = Mock(spec=CommitMessage)
+        datasink._pending_commit_messages = [commit_msg1]
+
+        mock_commit = Mock()
+        mock_commit.abort.side_effect = Exception("Abort failed")
+        datasink._writer_builder.new_commit = Mock(return_value=mock_commit)
+        error = Exception("Write job failed")
+        datasink.on_write_failed(error)
+
+        mock_commit.abort.assert_called_once()
+        mock_commit.close.assert_called_once()
+        self.assertEqual(datasink._pending_commit_messages, [])
+
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/paimon-python/pypaimon/write/ray_datasink.py 
b/paimon-python/pypaimon/write/ray_datasink.py
index 709a010fa4..5c809825a7 100644
--- a/paimon-python/pypaimon/write/ray_datasink.py
+++ b/paimon-python/pypaimon/write/ray_datasink.py
@@ -1,3 +1,4 @@
+################################################################################
 #  Licensed to the Apache Software Foundation (ASF) under one
 #  or more contributor license agreements.  See the NOTICE file
 #  distributed with this work for additional information
@@ -15,77 +16,176 @@
 # limitations under the License.
 
################################################################################
 """
-Module to reawrited a Paimon table from a Ray Dataset, by using the Ray 
Datasink API.
+Module to write a Paimon table from a Ray Dataset, by using the Ray Datasink 
API.
 """
 
-from typing import Iterable
-from ray.data.datasource.datasink import Datasink, WriteResult, WriteReturnType
-from pypaimon.table.table import Table
-from pypaimon.write.write_builder import WriteBuilder
-from ray.data.block import BlockAccessor
-from ray.data.block import Block
+import logging
+from typing import TYPE_CHECKING, Iterable, List, Optional
+
+from ray.data.datasource.datasink import Datasink, WriteResult
+from ray.util.annotations import DeveloperAPI
+from ray.data.block import BlockAccessor, Block
 from ray.data._internal.execution.interfaces import TaskContext
 import pyarrow as pa
 
+if TYPE_CHECKING:
+    from pypaimon.table.table import Table
+    from pypaimon.write.write_builder import WriteBuilder
+    from pypaimon.write.commit_message import CommitMessage
+
+logger = logging.getLogger(__name__)
 
-class PaimonDatasink(Datasink):
-    
-    def __init__(self, table: Table, overwrite=False):
+
+@DeveloperAPI
+class PaimonDatasink(Datasink[List["CommitMessage"]]):
+    def __init__(
+        self,
+        table: "Table",
+        overwrite: bool = False,
+    ):
         self.table = table
         self.overwrite = overwrite
+        self._table_name = table.identifier.get_full_name()
+        self._writer_builder: Optional["WriteBuilder"] = None
+        self._pending_commit_messages: List["CommitMessage"] = []
 
-    def on_write_start(self, schema=None) -> None:
-        """Callback for when a write job starts.
+    def __getstate__(self) -> dict:
+        state = self.__dict__.copy()
+        return state
 
-        Use this method to perform setup for write tasks. For example, 
creating a
-        staging bucket in S3.
+    def __setstate__(self, state: dict) -> None:
+        self.__dict__.update(state)
+        writer_builder = getattr(self, '_writer_builder', None)
+        if writer_builder is not None and not hasattr(writer_builder, 'table'):
+            self._writer_builder = None
+        if not hasattr(self, '_table_name'):
+            self._table_name = self.table.identifier.get_full_name()
 
-        Args:
-            schema: Optional schema information passed by Ray Data.
-        """
-        self.writer_builder: WriteBuilder = 
self.table.new_batch_write_builder()
+    def on_write_start(self, schema=None) -> None:
+        logger.info(f"Starting write job for table {self._table_name}")
+
+        self._writer_builder = self.table.new_batch_write_builder()
         if self.overwrite:
-            self.writer_builder = self.writer_builder.overwrite()
+            self._writer_builder = self._writer_builder.overwrite()
 
     def write(
         self,
         blocks: Iterable[Block],
         ctx: TaskContext,
-    ) -> WriteReturnType:
-        """Write blocks. This is used by a single write task.
-
-        Args:
-            blocks: Generator of data blocks.
-            ctx: ``TaskContext`` for the write task.
-
-        Returns:
-            Result of this write task. When the entire write operator finishes,
-            All returned values will be passed as `WriteResult.write_returns`
-            to `Datasink.on_write_complete`.
-        """
-        table_write = self.writer_builder.new_write()
-        for block in blocks:
-            block_arrow: pa.Table = BlockAccessor.for_block(block).to_arrow()
-            table_write.write_arrow(block_arrow)
-        commit_messages = table_write.prepare_commit()
-        table_write.close()
-        return commit_messages
-
-    def on_write_complete(self, write_result: WriteResult[WriteReturnType]):
-        """Callback for when a write job completes.
-
-        This can be used to `commit` a write output. This method must
-        succeed prior to ``write_datasink()`` returning to the user. If this
-        method fails, then ``on_write_failed()`` is called.
-
-        Args:
-            write_result: Aggregated result of the
-            Write operator, containing write results and stats.
-        """
-        table_commit = self.writer_builder.new_commit()
-        table_commit.commit([
-            commit_message
-            for commit_messages in write_result.write_returns
-            for commit_message in commit_messages
-        ])
-        table_commit.close()
+    ) -> List["CommitMessage"]:
+        commit_messages_list: List["CommitMessage"] = []
+        table_write = None
+
+        try:
+            writer_builder = self.table.new_batch_write_builder()
+            if self.overwrite:
+                writer_builder = writer_builder.overwrite()
+            
+            table_write = writer_builder.new_write()
+
+            for block in blocks:
+                block_arrow: pa.Table = 
BlockAccessor.for_block(block).to_arrow()
+
+                if block_arrow.num_rows == 0:
+                    continue
+
+                table_write.write_arrow(block_arrow)
+
+            commit_messages = table_write.prepare_commit()
+            commit_messages_list.extend(commit_messages)
+        finally:
+            if table_write is not None:
+                table_write.close()
+
+        return commit_messages_list
+
+    def on_write_complete(
+        self, write_result: WriteResult[List["CommitMessage"]]
+    ):
+        table_commit = None
+        commit_messages_to_abort = []
+        try:
+            all_commit_messages = [
+                commit_message
+                for commit_messages in write_result.write_returns
+                for commit_message in commit_messages
+            ]
+
+            non_empty_messages = [
+                msg for msg in all_commit_messages if not msg.is_empty()
+            ]
+
+            self._pending_commit_messages = non_empty_messages
+
+            if not non_empty_messages:
+                logger.info("No data to commit (all commit messages are 
empty)")
+                self._pending_commit_messages = []
+                return
+
+            logger.info(
+                f"Committing {len(non_empty_messages)} commit messages "
+                f"for table {self._table_name}"
+            )
+
+            table_commit = self._writer_builder.new_commit()
+            commit_messages_to_abort = non_empty_messages
+            table_commit.commit(non_empty_messages)
+
+            commit_messages_to_abort = []
+            self._pending_commit_messages = []
+
+            logger.info(f"Successfully committed write job for table 
{self._table_name}")
+        except Exception as e:
+            logger.error(
+                f"Error committing write job for table {self._table_name}: 
{e}",
+                exc_info=e
+            )
+            if table_commit is not None and commit_messages_to_abort:
+                try:
+                    table_commit.abort(commit_messages_to_abort)
+                    logger.info(
+                        f"Aborted {len(commit_messages_to_abort)} commit 
messages "
+                        f"for table {self._table_name}"
+                    )
+                except Exception as abort_error:
+                    logger.error(
+                        f"Error aborting commit messages: {abort_error}",
+                        exc_info=abort_error
+                    )
+                finally:
+                    self._pending_commit_messages = []
+            raise
+        finally:
+            if table_commit is not None:
+                try:
+                    table_commit.close()
+                except Exception as e:
+                    logger.warning(
+                        f"Error closing table_commit: {e}",
+                        exc_info=e
+                    )
+
+    def on_write_failed(self, error: Exception) -> None:
+        logger.error(
+            f"Write job failed for table {self._table_name}. Error: {error}",
+            exc_info=error
+        )
+        
+        if self._pending_commit_messages:
+            try:
+                table_commit = self._writer_builder.new_commit()
+                try:
+                    table_commit.abort(self._pending_commit_messages)
+                    logger.info(
+                        f"Aborted {len(self._pending_commit_messages)} commit 
messages "
+                        f"for table {self._table_name} in on_write_failed()"
+                    )
+                finally:
+                    table_commit.close()
+            except Exception as abort_error:
+                logger.error(
+                    f"Error aborting commit messages in on_write_failed(): 
{abort_error}",
+                    exc_info=abort_error
+                )
+            finally:
+                self._pending_commit_messages = []
diff --git a/paimon-python/pypaimon/write/table_write.py 
b/paimon-python/pypaimon/write/table_write.py
index 9a4bdecf26..a2c9a9a4c3 100644
--- a/paimon-python/pypaimon/write/table_write.py
+++ b/paimon-python/pypaimon/write/table_write.py
@@ -16,7 +16,7 @@
 # limitations under the License.
 
################################################################################
 from collections import defaultdict
-from typing import List
+from typing import TYPE_CHECKING, Any, Dict, List, Optional
 
 import pyarrow as pa
 
@@ -25,6 +25,9 @@ from pypaimon.snapshot.snapshot import BATCH_COMMIT_IDENTIFIER
 from pypaimon.write.commit_message import CommitMessage
 from pypaimon.write.file_store_write import FileStoreWrite
 
+if TYPE_CHECKING:
+    from ray.data import Dataset
+
 
 class TableWrite:
     def __init__(self, table, commit_user):
@@ -68,10 +71,32 @@ class TableWrite:
         self.file_store_write.write_cols = write_cols
         return self
 
-    def write_raydata(self, dataset, overwrite=False, parallelism=1):
+    def write_ray(
+        self,
+        dataset: "Dataset",
+        overwrite: bool = False,
+        concurrency: Optional[int] = None,
+        ray_remote_args: Optional[Dict[str, Any]] = None,
+    ) -> None:
+        """
+        Write a Ray Dataset to Paimon table.
+        
+        Args:
+            dataset: Ray Dataset to write. This is a distributed data 
collection
+                from Ray Data (ray.data.Dataset).
+            overwrite: Whether to overwrite existing data. Defaults to False.
+            concurrency: Optional max number of Ray tasks to run concurrently.
+                By default, dynamically decided based on available resources.
+            ray_remote_args: Optional kwargs passed to :func:`ray.remote` in 
write tasks.
+                For example, ``{"num_cpus": 2, "max_retries": 3}``.
+        """
         from pypaimon.write.ray_datasink import PaimonDatasink
         datasink = PaimonDatasink(self.table, overwrite=overwrite)
-        dataset.write_datasink(datasink, concurrency=parallelism)
+        dataset.write_datasink(
+            datasink,
+            concurrency=concurrency,
+            ray_remote_args=ray_remote_args,
+        )
 
     def close(self):
         self.file_store_write.close()
diff --git a/pom.xml b/pom.xml
index f035f79c22..c9025eee8c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -637,6 +637,7 @@ under the License.
                         <exclude>paimon-common/src/main/antlr4/**</exclude>
                         
<exclude>paimon-core/src/test/resources/compatibility/**</exclude>
                         
<exclude>paimon-faiss/paimon-faiss-jni/build/**</exclude>
+                        
<exclude>paimon-python/pypaimon/sample/data/**</exclude>
                     </excludes>
                 </configuration>
             </plugin>


Reply via email to