This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7f1c3293a1 [python] ray version compatible (#6937)
7f1c3293a1 is described below
commit 7f1c3293a15ebeff994f4e90d337b57e9de202de
Author: XiaoHongbo <[email protected]>
AuthorDate: Wed Dec 31 18:16:53 2025 +0800
[python] ray version compatible (#6937)
---
.github/workflows/paimon-python-checks.yml | 104 ++++++++++++++++++++++++++
paimon-python/pypaimon/read/ray_datasource.py | 50 +++++++++----
2 files changed, 138 insertions(+), 16 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index 6df806a4f4..4fb7fe07e4 100755
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -101,3 +101,107 @@ jobs:
run: |
chmod +x paimon-python/dev/lint-python.sh
./paimon-python/dev/lint-python.sh
+
+ requirement_version_compatible_test:
+ runs-on: ubuntu-latest
+ container: "python:3.10-slim"
+
+ steps:
+ - name: Checkout code
+ uses: actions/checkout@v2
+
+ - name: Set up JDK ${{ env.JDK_VERSION }}
+ uses: actions/setup-java@v4
+ with:
+ java-version: ${{ env.JDK_VERSION }}
+ distribution: 'temurin'
+
+ - name: Set up Maven
+ uses: stCarolas/[email protected]
+ with:
+ maven-version: 3.8.8
+
+ - name: Install system dependencies
+ shell: bash
+ run: |
+ apt-get update && apt-get install -y \
+ build-essential \
+ git \
+ curl \
+ && rm -rf /var/lib/apt/lists/*
+
+ - name: Verify Java and Maven installation
+ run: |
+ java -version
+ mvn -version
+
+ - name: Verify Python version
+ run: python --version
+
+ - name: Build Java
+ run: |
+ echo "Start compiling modules"
+ mvn -T 2C -B clean install -DskipTests
+
+ - name: Install base Python dependencies
+ shell: bash
+ run: |
+ python -m pip install --upgrade pip
+ python -m pip install -q \
+ pyroaring \
+ readerwriterlock==1.0.9 \
+ fsspec==2024.3.1 \
+ cachetools==5.3.3 \
+ ossfs==2023.12.0 \
+ fastavro==1.11.1 \
+ pyarrow==16.0.0 \
+ zstandard==0.24.0 \
+ polars==1.32.0 \
+ duckdb==1.3.2 \
+ numpy==1.24.3 \
+ pandas==2.0.3 \
+ pytest~=7.0 \
+ py4j==0.10.9.9 \
+ requests \
+ parameterized==0.9.0 \
+ packaging
+
+ - name: Test requirement version compatibility
+ shell: bash
+ run: |
+ cd paimon-python
+
+ # Test Ray version compatibility
+ echo "=========================================="
+ echo "Testing Ray version compatibility"
+ echo "=========================================="
+ for ray_version in 2.44.0 2.48.0 2.53.0; do
+ echo "Testing Ray version: $ray_version"
+
+ # Install specific Ray version
+ python -m pip install -q ray==$ray_version
+
+ # Verify Ray version
+ python -c "import ray; print(f'Ray version: {ray.__version__}')"
+ python -c "from packaging.version import parse; import ray; assert
parse(ray.__version__) == parse('$ray_version'), f'Expected Ray $ray_version,
got {ray.__version__}'"
+
+ # Run tests
+ python -m pytest pypaimon/tests/ray_data_test.py::RayDataTest -v
--tb=short || {
+ echo "Tests failed for Ray $ray_version"
+ exit 1
+ }
+
+ # Uninstall Ray to avoid conflicts
+ python -m pip uninstall -y ray
+ done
+
+ # Add other dependency version tests here in the future
+ # Example:
+ # echo "=========================================="
+ # echo "Testing PyArrow version compatibility"
+ # echo "=========================================="
+ # for pyarrow_version in 16.0.0 17.0.0 18.0.0; do
+ # ...
+ # done
+ env:
+ PYTHONPATH: ${{ github.workspace }}/paimon-python
diff --git a/paimon-python/pypaimon/read/ray_datasource.py
b/paimon-python/pypaimon/read/ray_datasource.py
index 9a13ae0fa1..905c8bddef 100644
--- a/paimon-python/pypaimon/read/ray_datasource.py
+++ b/paimon-python/pypaimon/read/ray_datasource.py
@@ -25,6 +25,8 @@ from functools import partial
from typing import List, Optional, Iterable
import pyarrow
+from packaging.version import parse
+import ray
from pypaimon.read.split import Split
from pypaimon.read.table_read import TableRead
@@ -32,6 +34,10 @@ from pypaimon.schema.data_types import PyarrowFieldParser
logger = logging.getLogger(__name__)
+# Ray version constants for compatibility
+RAY_VERSION_SCHEMA_IN_READ_TASK = "2.48.0" # Schema moved from BlockMetadata
to ReadTask
+RAY_VERSION_PER_TASK_ROW_LIMIT = "2.52.0" # per_task_row_limit parameter
introduced
+
from ray.data.datasource import Datasource
@@ -94,11 +100,13 @@ class PaimonDatasource(Datasource):
return chunks
- def get_read_tasks(self, parallelism: int) -> List:
+ def get_read_tasks(self, parallelism: int, **kwargs) -> List:
"""Return a list of read tasks that can be executed in parallel."""
from ray.data.datasource import ReadTask
from ray.data.block import BlockMetadata
+ per_task_row_limit = kwargs.get('per_task_row_limit', None)
+
# Validate parallelism parameter
if parallelism < 1:
raise ValueError(f"parallelism must be at least 1, got
{parallelism}")
@@ -191,20 +199,30 @@ class PaimonDatasource(Datasource):
num_rows = total_rows if total_rows > 0 else None
size_bytes = total_size if total_size > 0 else None
- metadata = BlockMetadata(
- num_rows=num_rows,
- size_bytes=size_bytes,
- input_files=input_files if input_files else None,
- exec_stats=None, # Will be populated by Ray during execution
- )
-
- # TODO: per_task_row_limit is not supported in Ray 2.48.0, will be
added in future versions
- read_tasks.append(
- ReadTask(
- read_fn=lambda splits=chunk_splits: get_read_task(splits),
- metadata=metadata,
- schema=schema,
- )
- )
+ metadata_kwargs = {
+ 'num_rows': num_rows,
+ 'size_bytes': size_bytes,
+ 'input_files': input_files if input_files else None,
+ 'exec_stats': None, # Will be populated by Ray during
execution
+ }
+
+ if parse(ray.__version__) < parse(RAY_VERSION_SCHEMA_IN_READ_TASK):
+ metadata_kwargs['schema'] = schema
+
+ metadata = BlockMetadata(**metadata_kwargs)
+
+ read_fn = partial(get_read_task, chunk_splits)
+ read_task_kwargs = {
+ 'read_fn': read_fn,
+ 'metadata': metadata,
+ }
+
+ if parse(ray.__version__) >=
parse(RAY_VERSION_SCHEMA_IN_READ_TASK):
+ read_task_kwargs['schema'] = schema
+
+ if parse(ray.__version__) >= parse(RAY_VERSION_PER_TASK_ROW_LIMIT)
and per_task_row_limit is not None:
+ read_task_kwargs['per_task_row_limit'] = per_task_row_limit
+
+ read_tasks.append(ReadTask(**read_task_kwargs))
return read_tasks