This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new be172f4d90 [python] sync to_ray method args with ray data api (#6948)
be172f4d90 is described below
commit be172f4d90984c545fca1a94439f602aa159a95c
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Jan 5 02:28:28 2026 +0800
[python] sync to_ray method args with ray data api (#6948)
---
docs/content/program-api/python-api.md | 42 +++++++++++++++++++---
paimon-python/pypaimon/read/table_read.py | 41 +++++++++++++++++----
.../sample/rest_catalog_ray_data_sample.py | 8 ++---
paimon-python/pypaimon/tests/ray_data_test.py | 41 ++++++++++-----------
4 files changed, 93 insertions(+), 39 deletions(-)
diff --git a/docs/content/program-api/python-api.md
b/docs/content/program-api/python-api.md
index 2d9d359e18..406c8c1ef6 100644
--- a/docs/content/program-api/python-api.md
+++ b/docs/content/program-api/python-api.md
@@ -426,14 +426,21 @@ print(ray_dataset.to_pandas())
# ...
```
-The `to_ray()` method supports a `parallelism` parameter to control
distributed reading. Use `parallelism=1` for single-task read (default) or
`parallelism > 1` for distributed read with multiple Ray workers:
+The `to_ray()` method supports Ray Data API parameters for distributed
processing:
```python
-# Simple mode (single task)
-ray_dataset = table_read.to_ray(splits, parallelism=1)
+# Basic usage
+ray_dataset = table_read.to_ray(splits)
+
+# Specify number of output blocks
+ray_dataset = table_read.to_ray(splits, override_num_blocks=4)
-# Distributed mode with 4 parallel tasks
-ray_dataset = table_read.to_ray(splits, parallelism=4)
+# Configure Ray remote arguments
+ray_dataset = table_read.to_ray(
+ splits,
+ override_num_blocks=4,
+ ray_remote_args={"num_cpus": 2, "max_retries": 3}
+)
# Use Ray Data operations
mapped_dataset = ray_dataset.map(lambda row: {'value': row['value'] * 2})
@@ -441,6 +448,31 @@ filtered_dataset = ray_dataset.filter(lambda row:
row['score'] > 80)
df = ray_dataset.to_pandas()
```
+**Parameters:**
+- `override_num_blocks`: Optional override for the number of output blocks. By
default,
+ Ray automatically determines the optimal number.
+- `ray_remote_args`: Optional kwargs passed to `ray.remote()` in read tasks
+ (e.g., `{"num_cpus": 2, "max_retries": 3}`).
+- `concurrency`: Optional max number of Ray tasks to run concurrently. By
default,
+ dynamically decided based on available resources.
+- `**read_args`: Additional kwargs passed to the datasource (e.g.,
`per_task_row_limit`
+ in Ray 2.52.0+).
+
+**Ray Block Size Configuration:**
+
+If you need to configure Ray's block size (e.g., when Paimon splits exceed
Ray's default
+128MB block size), set it before calling `to_ray()`:
+
+```python
+from ray.data import DataContext
+
+ctx = DataContext.get_current()
+ctx.target_max_block_size = 256 * 1024 * 1024 # 256MB (default is 128MB)
+ray_dataset = table_read.to_ray(splits)
+```
+
+See [Ray Data API
Documentation](https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html)
for more details.
+
### Incremental Read
This API allows reading data committed between two snapshot timestamps. The
steps are as follows.
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index 626cbc2be7..953384cc7d 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from typing import Iterator, List, Optional
+from typing import Any, Dict, Iterator, List, Optional
import pandas
import pyarrow
@@ -128,8 +128,30 @@ class TableRead:
con.register(table_name, self.to_arrow(splits))
return con
- def to_ray(self, splits: List[Split], parallelism: int = 1) ->
"ray.data.dataset.Dataset":
- """Convert Paimon table data to Ray Dataset."""
+ def to_ray(
+ self,
+ splits: List[Split],
+ *,
+ ray_remote_args: Optional[Dict[str, Any]] = None,
+ concurrency: Optional[int] = None,
+ override_num_blocks: Optional[int] = None,
+ **read_args,
+ ) -> "ray.data.dataset.Dataset":
+ """Convert Paimon table data to Ray Dataset.
+ Args:
+ splits: List of splits to read from the Paimon table.
+ ray_remote_args: Optional kwargs passed to :func:`ray.remote` in
read tasks.
+ For example, ``{"num_cpus": 2, "max_retries": 3}``.
+ concurrency: Optional max number of Ray tasks to run concurrently.
+ By default, dynamically decided based on available resources.
+ override_num_blocks: Optional override for the number of output
blocks.
+ You needn't manually set this in most cases.
+ **read_args: Additional kwargs passed to the datasource.
+ For example, ``per_task_row_limit`` (Ray 2.52.0+).
+
+ See `Ray Data API
<https://docs.ray.io/en/latest/data/api/doc/ray.data.read_datasource.html>`_
+ for details.
+ """
import ray
if not splits:
@@ -140,13 +162,18 @@ class TableRead:
)
return ray.data.from_arrow(empty_table)
- # Validate parallelism parameter
- if parallelism < 1:
- raise ValueError(f"parallelism must be at least 1, got
{parallelism}")
+ if override_num_blocks is not None and override_num_blocks < 1:
+ raise ValueError(f"override_num_blocks must be at least 1, got
{override_num_blocks}")
from pypaimon.read.ray_datasource import PaimonDatasource
datasource = PaimonDatasource(self, splits)
- return ray.data.read_datasource(datasource, parallelism=parallelism)
+ return ray.data.read_datasource(
+ datasource,
+ ray_remote_args=ray_remote_args,
+ concurrency=concurrency,
+ override_num_blocks=override_num_blocks,
+ **read_args
+ )
def _create_split_read(self, split: Split) -> SplitRead:
if self.table.is_primary_key_table and not split.raw_convertible:
diff --git a/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py
b/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py
index 268946b5a2..fafe31b2f3 100644
--- a/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py
+++ b/paimon-python/pypaimon/sample/rest_catalog_ray_data_sample.py
@@ -121,7 +121,7 @@ def main():
print(f"Number of splits: {len(splits)}")
# Convert to Ray Dataset
- ray_dataset = table_read.to_ray(splits, parallelism=2)
+ ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
print("✓ Ray Dataset created successfully")
print(f" - Total rows: {ray_dataset.count()}")
# Note: num_blocks() requires materialized dataset, so we skip it for
simplicity
@@ -135,7 +135,7 @@ def main():
print("\n" + "="*60)
print("Step 3: Comparison with simple read mode")
print("="*60)
- ray_dataset_simple = table_read.to_ray(splits, parallelism=1)
+ ray_dataset_simple = table_read.to_ray(splits, override_num_blocks=1)
df_simple = ray_dataset_simple.to_pandas()
print(f"Distributed mode rows: {ray_dataset.count()}")
@@ -193,7 +193,7 @@ def main():
table_scan_filtered = read_builder_filtered.new_scan()
splits_filtered = table_scan_filtered.plan().splits()
- ray_dataset_filtered = table_read_filtered.to_ray(splits_filtered,
parallelism=2)
+ ray_dataset_filtered = table_read_filtered.to_ray(splits_filtered,
override_num_blocks=2)
df_filtered_at_read = ray_dataset_filtered.to_pandas()
print(f"✓ Filtered at read time: {ray_dataset_filtered.count()} rows")
print(f" - All categories are 'A':
{all(df_filtered_at_read['category'] == 'A')}")
@@ -207,7 +207,7 @@ def main():
table_scan_projected = read_builder_projected.new_scan()
splits_projected = table_scan_projected.plan().splits()
- ray_dataset_projected = table_read_projected.to_ray(splits_projected,
parallelism=2)
+ ray_dataset_projected = table_read_projected.to_ray(splits_projected,
override_num_blocks=2)
df_projected = ray_dataset_projected.to_pandas()
print(f"✓ Projected columns: {list(df_projected.columns)}")
print(" - Expected: ['id', 'name', 'value']")
diff --git a/paimon-python/pypaimon/tests/ray_data_test.py
b/paimon-python/pypaimon/tests/ray_data_test.py
index fc9e967ddf..42bca3d60b 100644
--- a/paimon-python/pypaimon/tests/ray_data_test.py
+++ b/paimon-python/pypaimon/tests/ray_data_test.py
@@ -109,7 +109,7 @@ class RayDataTest(unittest.TestCase):
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
- ray_dataset = table_read.to_ray(splits, parallelism=2)
+ ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
# Verify Ray dataset
self.assertIsNotNone(ray_dataset, "Ray dataset should not be None")
@@ -169,7 +169,7 @@ class RayDataTest(unittest.TestCase):
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
- ray_dataset = table_read.to_ray(splits, parallelism=2)
+ ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
# Verify filtered results
self.assertEqual(ray_dataset.count(), 2, "Should have 2 rows after
filtering")
@@ -215,7 +215,7 @@ class RayDataTest(unittest.TestCase):
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
- ray_dataset = table_read.to_ray(splits, parallelism=2)
+ ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
# Verify projection
self.assertEqual(ray_dataset.count(), 3, "Should have 3 rows")
@@ -256,7 +256,7 @@ class RayDataTest(unittest.TestCase):
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
- ray_dataset = table_read.to_ray(splits, parallelism=2)
+ ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
# Apply map operation (double the value)
def double_value(row):
@@ -303,7 +303,7 @@ class RayDataTest(unittest.TestCase):
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
- ray_dataset = table_read.to_ray(splits, parallelism=2)
+ ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
# Apply filter operation (score >= 80)
filtered_dataset = ray_dataset.filter(lambda row: row['score'] >= 80)
@@ -346,8 +346,8 @@ class RayDataTest(unittest.TestCase):
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
- ray_dataset_distributed = table_read.to_ray(splits, parallelism=2)
- ray_dataset_simple = table_read.to_ray(splits, parallelism=1)
+ ray_dataset_distributed = table_read.to_ray(splits,
override_num_blocks=2)
+ ray_dataset_simple = table_read.to_ray(splits, override_num_blocks=1)
# Both should produce the same results
self.assertEqual(ray_dataset_distributed.count(), 3, "Distributed mode
should have 3 rows")
@@ -394,7 +394,7 @@ class RayDataTest(unittest.TestCase):
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
- ray_dataset = table_read.to_ray(splits, parallelism=1)
+ ray_dataset = table_read.to_ray(splits, override_num_blocks=1)
self.assertIsNotNone(ray_dataset, "Ray dataset should not be None")
self.assertEqual(ray_dataset.count(), 5, "Should have 5 rows")
@@ -453,7 +453,7 @@ class RayDataTest(unittest.TestCase):
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
- ray_dataset = table_read.to_ray(splits, parallelism=2)
+ ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
self.assertIsNotNone(ray_dataset, "Ray dataset should not be None")
self.assertEqual(ray_dataset.count(), 4, "Should have 4 rows after
upsert")
@@ -510,7 +510,7 @@ class RayDataTest(unittest.TestCase):
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
- ray_dataset = table_read.to_ray(splits, parallelism=1)
+ ray_dataset = table_read.to_ray(splits, override_num_blocks=1)
# Verify filtered results
self.assertEqual(ray_dataset.count(), 2, "Should have 2 rows after
filtering")
@@ -528,7 +528,7 @@ class RayDataTest(unittest.TestCase):
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
- ray_dataset = table_read.to_ray(splits, parallelism=1)
+ ray_dataset = table_read.to_ray(splits, override_num_blocks=1)
# Verify filtered results by partition
self.assertEqual(ray_dataset.count(), 2, "Should have 2 rows in
partition 2024-01-01")
@@ -588,7 +588,7 @@ class RayDataTest(unittest.TestCase):
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
- ray_dataset = table_read.to_ray(splits, parallelism=2)
+ ray_dataset = table_read.to_ray(splits, override_num_blocks=2)
self.assertIsNotNone(ray_dataset, "Ray dataset should not be None")
self.assertEqual(ray_dataset.count(), 4, "Should have 4 rows after
upsert")
@@ -604,7 +604,6 @@ class RayDataTest(unittest.TestCase):
self.assertEqual(list(df_sorted['value']), [150, 250, 300, 400],
"Value column should reflect updates")
def test_ray_data_invalid_parallelism(self):
- """Test that invalid parallelism values raise ValueError."""
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
@@ -633,21 +632,17 @@ class RayDataTest(unittest.TestCase):
table_scan = read_builder.new_scan()
splits = table_scan.plan().splits()
- # Test with parallelism = 0
with self.assertRaises(ValueError) as context:
- table_read.to_ray(splits, parallelism=0)
- self.assertIn("parallelism must be at least 1", str(context.exception))
+ table_read.to_ray(splits, override_num_blocks=0)
+ self.assertIn("override_num_blocks must be at least 1",
str(context.exception))
- # Test with parallelism < 0
with self.assertRaises(ValueError) as context:
- table_read.to_ray(splits, parallelism=-1)
- self.assertIn("parallelism must be at least 1", str(context.exception))
+ table_read.to_ray(splits, override_num_blocks=-1)
+ self.assertIn("override_num_blocks must be at least 1",
str(context.exception))
- # Test with parallelism = -10
with self.assertRaises(ValueError) as context:
- table_read.to_ray(splits, parallelism=-10)
- self.assertIn("parallelism must be at least 1", str(context.exception))
-
+ table_read.to_ray(splits, override_num_blocks=-10)
+ self.assertIn("override_num_blocks must be at least 1",
str(context.exception))
if __name__ == '__main__':
unittest.main()