This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-python.git


The following commit(s) were added to refs/heads/main by this push:
     new 03108ec  Add read API to convert result to DuckDB and Ray (#28)
03108ec is described below

commit 03108ec2edb1b08ffb53de52c5452dc75adf2b34
Author: yuzelin <[email protected]>
AuthorDate: Tue Nov 26 13:08:14 2024 +0800

    Add read API to convert result to DuckDB and Ray (#28)
---
 dev/dev-requirements.txt                        |  2 +
 paimon_python_api/table_read.py                 | 16 +++++++-
 paimon_python_java/pypaimon.py                  | 15 ++++++++
 paimon_python_java/tests/test_write_and_read.py | 49 +++++++++++++++++++------
 4 files changed, 70 insertions(+), 12 deletions(-)

diff --git a/dev/dev-requirements.txt b/dev/dev-requirements.txt
index 7fd1aeb..4ed964e 100755
--- a/dev/dev-requirements.txt
+++ b/dev/dev-requirements.txt
@@ -26,3 +26,5 @@ numpy>=1.22.4
 python-dateutil>=2.8.0,<3
 pytz>=2018.3
 pytest~=7.0
+duckdb>=0.5.0,<2.0.0
+ray~=2.10.0
diff --git a/paimon_python_api/table_read.py b/paimon_python_api/table_read.py
index 24095b4..f0a7b59 100644
--- a/paimon_python_api/table_read.py
+++ b/paimon_python_api/table_read.py
@@ -18,10 +18,12 @@
 
 import pandas as pd
 import pyarrow as pa
+import ray
 
 from abc import ABC, abstractmethod
+from duckdb.duckdb import DuckDBPyConnection
 from paimon_python_api import Split
-from typing import List
+from typing import List, Optional
 
 
 class TableRead(ABC):
@@ -38,3 +40,15 @@ class TableRead(ABC):
     @abstractmethod
     def to_pandas(self, splits: List[Split]) -> pd.DataFrame:
         """Read data from splits and converted to pandas.DataFrame format."""
+
+    @abstractmethod
+    def to_duckdb(
+            self,
+            splits: List[Split],
+            table_name: str,
+            connection: Optional[DuckDBPyConnection] = None) -> 
DuckDBPyConnection:
+        """Convert splits into an in-memory DuckDB table which can be 
queried."""
+
+    @abstractmethod
+    def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset:
+        """Convert splits into a Ray dataset format."""
diff --git a/paimon_python_java/pypaimon.py b/paimon_python_java/pypaimon.py
index b884fa4..803540c 100644
--- a/paimon_python_java/pypaimon.py
+++ b/paimon_python_java/pypaimon.py
@@ -16,9 +16,12 @@
 # limitations under the License.
 
################################################################################
 
+import duckdb
 import pandas as pd
 import pyarrow as pa
+import ray
 
+from duckdb.duckdb import DuckDBPyConnection
 from paimon_python_java.java_gateway import get_gateway
 from paimon_python_java.util import java_utils, constants
 from paimon_python_api import (catalog, table, read_builder, table_scan, 
split, table_read,
@@ -161,6 +164,18 @@ class TableRead(table_read.TableRead):
     def to_pandas(self, splits: List[Split]) -> pd.DataFrame:
         return self.to_arrow(splits).to_pandas()
 
+    def to_duckdb(
+            self,
+            splits: List[Split],
+            table_name: str,
+            connection: Optional[DuckDBPyConnection] = None) -> 
DuckDBPyConnection:
+        con = connection or duckdb.connect(database=":memory:")
+        con.register(table_name, self.to_arrow(splits))
+        return con
+
+    def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset:
+        return ray.data.from_arrow(self.to_arrow(splits))
+
     def _init(self):
         if self._j_bytes_reader is None:
             # get thread num
diff --git a/paimon_python_java/tests/test_write_and_read.py 
b/paimon_python_java/tests/test_write_and_read.py
index 337b9f5..e2c631d 100644
--- a/paimon_python_java/tests/test_write_and_read.py
+++ b/paimon_python_java/tests/test_write_and_read.py
@@ -267,6 +267,12 @@ class TableWriteReadTest(unittest.TestCase):
         table_write.close()
         table_commit.close()
 
+        all_data = pd.DataFrame({
+            'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
+            'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
+        })
+        all_data['f0'] = all_data['f0'].astype('int32')
+
         read_builder = table.new_read_builder()
         table_scan = read_builder.new_scan()
         table_read = read_builder.new_read()
@@ -274,10 +280,7 @@ class TableWriteReadTest(unittest.TestCase):
 
         # to_arrow
         actual = table_read.to_arrow(splits)
-        expected = pa.Table.from_pydict({
-            'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
-            'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
-        }, schema=self.simple_pa_schema)
+        expected = pa.Table.from_pandas(all_data, schema=self.simple_pa_schema)
         self.assertEqual(actual, expected)
 
         # to_arrow_batch_reader
@@ -286,18 +289,42 @@ class TableWriteReadTest(unittest.TestCase):
             for batch in table_read.to_arrow_batch_reader(splits)
         ]
         actual = pd.concat(data_frames)
-        expected = pd.DataFrame({
-            'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
-            'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
-        })
-        expected['f0'] = expected['f0'].astype('int32')
         pd.testing.assert_frame_equal(
-            actual.reset_index(drop=True), expected.reset_index(drop=True))
+            actual.reset_index(drop=True), all_data.reset_index(drop=True))
 
         # to_pandas
         actual = table_read.to_pandas(splits)
         pd.testing.assert_frame_equal(
-            actual.reset_index(drop=True), expected.reset_index(drop=True))
+            actual.reset_index(drop=True), all_data.reset_index(drop=True))
+
+        # to_duckdb
+        duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
+        # select *
+        result1 = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()
+        pd.testing.assert_frame_equal(
+            result1.reset_index(drop=True), all_data.reset_index(drop=True))
+        # select * where
+        result2 = duckdb_con.query("SELECT * FROM duckdb_table WHERE f0 < 
4").fetchdf()
+        expected2 = pd.DataFrame({
+            'f0': [1, 2, 3],
+            'f1': ['a', 'b', 'c']
+        })
+        expected2['f0'] = expected2['f0'].astype('int32')
+        pd.testing.assert_frame_equal(
+            result2.reset_index(drop=True), expected2.reset_index(drop=True))
+        # select f0 where
+        result3 = duckdb_con.query("SELECT f0 FROM duckdb_table WHERE f0 < 
4").fetchdf()
+        expected3 = pd.DataFrame({
+            'f0': [1, 2, 3]
+        })
+        expected3['f0'] = expected3['f0'].astype('int32')
+        pd.testing.assert_frame_equal(
+            result3.reset_index(drop=True), expected3.reset_index(drop=True))
+
+        # to_ray
+        ray_dataset = table_read.to_ray(splits)
+        pd.testing.assert_frame_equal(
+            ray_dataset.to_pandas().reset_index(drop=True), 
all_data.reset_index(drop=True))
 
     def test_overwrite(self):
         schema = Schema(self.simple_pa_schema, partition_keys=['f0'],

Reply via email to