This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-python.git
The following commit(s) were added to refs/heads/main by this push:
new 03108ec Add read API to convert result to DuckDB and Ray (#28)
03108ec is described below
commit 03108ec2edb1b08ffb53de52c5452dc75adf2b34
Author: yuzelin <[email protected]>
AuthorDate: Tue Nov 26 13:08:14 2024 +0800
Add read API to convert result to DuckDB and Ray (#28)
---
dev/dev-requirements.txt | 2 +
paimon_python_api/table_read.py | 16 +++++++-
paimon_python_java/pypaimon.py | 15 ++++++++
paimon_python_java/tests/test_write_and_read.py | 49 +++++++++++++++++++------
4 files changed, 70 insertions(+), 12 deletions(-)
diff --git a/dev/dev-requirements.txt b/dev/dev-requirements.txt
index 7fd1aeb..4ed964e 100755
--- a/dev/dev-requirements.txt
+++ b/dev/dev-requirements.txt
@@ -26,3 +26,5 @@ numpy>=1.22.4
python-dateutil>=2.8.0,<3
pytz>=2018.3
pytest~=7.0
+duckdb>=0.5.0,<2.0.0
+ray~=2.10.0
diff --git a/paimon_python_api/table_read.py b/paimon_python_api/table_read.py
index 24095b4..f0a7b59 100644
--- a/paimon_python_api/table_read.py
+++ b/paimon_python_api/table_read.py
@@ -18,10 +18,12 @@
import pandas as pd
import pyarrow as pa
+import ray
from abc import ABC, abstractmethod
+from duckdb.duckdb import DuckDBPyConnection
from paimon_python_api import Split
-from typing import List
+from typing import List, Optional
class TableRead(ABC):
@@ -38,3 +40,15 @@ class TableRead(ABC):
@abstractmethod
def to_pandas(self, splits: List[Split]) -> pd.DataFrame:
"""Read data from splits and converted to pandas.DataFrame format."""
+
+ @abstractmethod
+ def to_duckdb(
+ self,
+ splits: List[Split],
+ table_name: str,
+ connection: Optional[DuckDBPyConnection] = None) ->
DuckDBPyConnection:
+ """Convert splits into an in-memory DuckDB table which can be
queried."""
+
+ @abstractmethod
+ def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset:
+ """Convert splits into a Ray dataset format."""
diff --git a/paimon_python_java/pypaimon.py b/paimon_python_java/pypaimon.py
index b884fa4..803540c 100644
--- a/paimon_python_java/pypaimon.py
+++ b/paimon_python_java/pypaimon.py
@@ -16,9 +16,12 @@
# limitations under the License.
################################################################################
+import duckdb
import pandas as pd
import pyarrow as pa
+import ray
+from duckdb.duckdb import DuckDBPyConnection
from paimon_python_java.java_gateway import get_gateway
from paimon_python_java.util import java_utils, constants
from paimon_python_api import (catalog, table, read_builder, table_scan,
split, table_read,
@@ -161,6 +164,18 @@ class TableRead(table_read.TableRead):
def to_pandas(self, splits: List[Split]) -> pd.DataFrame:
return self.to_arrow(splits).to_pandas()
+ def to_duckdb(
+ self,
+ splits: List[Split],
+ table_name: str,
+ connection: Optional[DuckDBPyConnection] = None) ->
DuckDBPyConnection:
+ con = connection or duckdb.connect(database=":memory:")
+ con.register(table_name, self.to_arrow(splits))
+ return con
+
+ def to_ray(self, splits: List[Split]) -> ray.data.dataset.Dataset:
+ return ray.data.from_arrow(self.to_arrow(splits))
+
def _init(self):
if self._j_bytes_reader is None:
# get thread num
diff --git a/paimon_python_java/tests/test_write_and_read.py
b/paimon_python_java/tests/test_write_and_read.py
index 337b9f5..e2c631d 100644
--- a/paimon_python_java/tests/test_write_and_read.py
+++ b/paimon_python_java/tests/test_write_and_read.py
@@ -267,6 +267,12 @@ class TableWriteReadTest(unittest.TestCase):
table_write.close()
table_commit.close()
+ all_data = pd.DataFrame({
+ 'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
+ 'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
+ })
+ all_data['f0'] = all_data['f0'].astype('int32')
+
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
@@ -274,10 +280,7 @@ class TableWriteReadTest(unittest.TestCase):
# to_arrow
actual = table_read.to_arrow(splits)
- expected = pa.Table.from_pydict({
- 'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
- 'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
- }, schema=self.simple_pa_schema)
+ expected = pa.Table.from_pandas(all_data, schema=self.simple_pa_schema)
self.assertEqual(actual, expected)
# to_arrow_batch_reader
@@ -286,18 +289,42 @@ class TableWriteReadTest(unittest.TestCase):
for batch in table_read.to_arrow_batch_reader(splits)
]
actual = pd.concat(data_frames)
- expected = pd.DataFrame({
- 'f0': [1, 2, 3, 4, 5, 6, 7, 8, 9],
- 'f1': ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i'],
- })
- expected['f0'] = expected['f0'].astype('int32')
pd.testing.assert_frame_equal(
- actual.reset_index(drop=True), expected.reset_index(drop=True))
+ actual.reset_index(drop=True), all_data.reset_index(drop=True))
# to_pandas
actual = table_read.to_pandas(splits)
pd.testing.assert_frame_equal(
- actual.reset_index(drop=True), expected.reset_index(drop=True))
+ actual.reset_index(drop=True), all_data.reset_index(drop=True))
+
+ # to_duckdb
+ duckdb_con = table_read.to_duckdb(splits, 'duckdb_table')
+ # select *
+ result1 = duckdb_con.query("SELECT * FROM duckdb_table").fetchdf()
+ pd.testing.assert_frame_equal(
+ result1.reset_index(drop=True), all_data.reset_index(drop=True))
+ # select * where
+ result2 = duckdb_con.query("SELECT * FROM duckdb_table WHERE f0 <
4").fetchdf()
+ expected2 = pd.DataFrame({
+ 'f0': [1, 2, 3],
+ 'f1': ['a', 'b', 'c']
+ })
+ expected2['f0'] = expected2['f0'].astype('int32')
+ pd.testing.assert_frame_equal(
+ result2.reset_index(drop=True), expected2.reset_index(drop=True))
+ # select f0 where
+ result3 = duckdb_con.query("SELECT f0 FROM duckdb_table WHERE f0 <
4").fetchdf()
+ expected3 = pd.DataFrame({
+ 'f0': [1, 2, 3]
+ })
+ expected3['f0'] = expected3['f0'].astype('int32')
+ pd.testing.assert_frame_equal(
+ result3.reset_index(drop=True), expected3.reset_index(drop=True))
+
+ # to_ray
+ ray_dataset = table_read.to_ray(splits)
+ pd.testing.assert_frame_equal(
+ ray_dataset.to_pandas().reset_index(drop=True),
all_data.reset_index(drop=True))
def test_overwrite(self):
schema = Schema(self.simple_pa_schema, partition_keys=['f0'],