This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-python.git
The following commit(s) were added to refs/heads/main by this push:
new 75d00d7 #38 Expose More Metadata in Object APIs (#39)
75d00d7 is described below
commit 75d00d7ad94cc5fa83b03b4d4aa1f401429f0224
Author: ChengHui Chen <[email protected]>
AuthorDate: Wed Feb 19 15:27:43 2025 +0800
#38 Expose More Metadata in Object APIs (#39)
---
pypaimon/api/read_builder.py | 9 ++++
pypaimon/api/{split.py => row_type.py} | 12 +++--
pypaimon/api/split.py | 14 +++++-
pypaimon/py4j/java_implementation.py | 31 +++++++++++-
pypaimon/py4j/tests/test_object_metadata.py | 73 +++++++++++++++++++++++++++++
5 files changed, 134 insertions(+), 5 deletions(-)
diff --git a/pypaimon/api/read_builder.py b/pypaimon/api/read_builder.py
index 68b7d46..cd5715b 100644
--- a/pypaimon/api/read_builder.py
+++ b/pypaimon/api/read_builder.py
@@ -20,6 +20,8 @@ from abc import ABC, abstractmethod
from pypaimon.api import TableRead, TableScan, Predicate, PredicateBuilder
from typing import List
+from pypaimon.api.row_type import RowType
+
class ReadBuilder(ABC):
"""An interface for building the TableScan and TableRead."""
@@ -50,3 +52,10 @@ class ReadBuilder(ABC):
@abstractmethod
def new_predicate_builder(self) -> PredicateBuilder:
"""Create a builder for Predicate."""
+
+ @abstractmethod
+ def read_type(self) -> RowType:
+ """
+ Return the row type of the builder. If there is a projection inside
+ the builder, the row type will only contain the selected fields.
+ """
diff --git a/pypaimon/api/split.py b/pypaimon/api/row_type.py
similarity index 80%
copy from pypaimon/api/split.py
copy to pypaimon/api/row_type.py
index 386c72b..b15c413 100644
--- a/pypaimon/api/split.py
+++ b/pypaimon/api/row_type.py
@@ -16,8 +16,14 @@
# limitations under the License.
#################################################################################
-from abc import ABC
+import pyarrow as pa
+from abc import ABC, abstractmethod
-class Split(ABC):
- """An input split for reading. The most important subclass is DataSplit."""
+
+class RowType(ABC):
+ """Data type of a sequence of fields."""
+
+ @abstractmethod
+ def as_arrow(self) -> "pa.Schema":
+ """Return the row type as an Arrow schema."""
diff --git a/pypaimon/api/split.py b/pypaimon/api/split.py
index 386c72b..5b9115c 100644
--- a/pypaimon/api/split.py
+++ b/pypaimon/api/split.py
@@ -16,8 +16,20 @@
# limitations under the License.
#################################################################################
-from abc import ABC
+from abc import ABC, abstractmethod
+
+from typing import Iterator
class Split(ABC):
"""An input split for reading. The most important subclass is DataSplit."""
+
+ @abstractmethod
+ def row_count(self) -> int:
+ """Return the total row count of the split."""
+
+ def file_size(self) -> int:
+ """Return the total file size of the split."""
+
+ def file_paths(self) -> Iterator[str]:
+ """Return the paths of all raw files in the split."""
diff --git a/pypaimon/py4j/java_implementation.py
b/pypaimon/py4j/java_implementation.py
index ce90bc5..9f378b7 100644
--- a/pypaimon/py4j/java_implementation.py
+++ b/pypaimon/py4j/java_implementation.py
@@ -24,7 +24,7 @@ import pyarrow as pa
from pypaimon.py4j.java_gateway import get_gateway
from pypaimon.py4j.util import java_utils, constants
from pypaimon.api import \
- (catalog, table, read_builder, table_scan, split,
+ (catalog, table, read_builder, table_scan, split, row_type,
table_read, write_builder, table_write, commit_message,
table_commit, Schema, predicate)
from typing import List, Iterator, Optional, Any, TYPE_CHECKING
@@ -115,6 +115,18 @@ class ReadBuilder(read_builder.ReadBuilder):
def new_predicate_builder(self) -> 'PredicateBuilder':
return PredicateBuilder(self._j_row_type)
+ def read_type(self) -> 'RowType':
+ return RowType(self._j_read_builder.readType())
+
+
+class RowType(row_type.RowType):
+
+ def __init__(self, j_row_type):
+ self._j_row_type = j_row_type
+
+ def as_arrow(self) -> "pa.Schema":
+ return java_utils.to_arrow_schema(self._j_row_type)
+
class TableScan(table_scan.TableScan):
@@ -144,6 +156,23 @@ class Split(split.Split):
def to_j_split(self):
return self._j_split
+ def row_count(self) -> int:
+ return self._j_split.rowCount()
+
+ def file_size(self) -> int:
+ files_optional = self._j_split.convertToRawFiles()
+ if not files_optional.isPresent():
+ return 0
+ files = files_optional.get()
+ return sum(file.length() for file in files)
+
+ def file_paths(self) -> List[str]:
+ files_optional = self._j_split.convertToRawFiles()
+ if not files_optional.isPresent():
+ return []
+ files = files_optional.get()
+ return [file.path() for file in files]
+
class TableRead(table_read.TableRead):
diff --git a/pypaimon/py4j/tests/test_object_metadata.py
b/pypaimon/py4j/tests/test_object_metadata.py
new file mode 100644
index 0000000..e3591c9
--- /dev/null
+++ b/pypaimon/py4j/tests/test_object_metadata.py
@@ -0,0 +1,73 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+import os
+import pyarrow as pa
+
+from pypaimon import Schema
+from pypaimon.py4j.tests import PypaimonTestBase
+
+
+class ObjectInfoTest(PypaimonTestBase):
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ cls.simple_pa_schema = pa.schema([
+ ('f0', pa.int32()),
+ ('f1', pa.string())
+ ])
+
+ def test_read_type_metadata(self):
+ schema = Schema(self.simple_pa_schema)
+ self.catalog.create_table('default.test_read_type_metadata', schema,
False)
+ table = self.catalog.get_table('default.test_read_type_metadata')
+
+ read_builder = table.new_read_builder()
+ read_builder.with_projection(['f1'])
+ pa_schema = read_builder.read_type().as_arrow()
+
+ self.assertEqual(len(pa_schema.names), 1)
+ self.assertEqual(pa_schema.names[0], 'f1')
+
+ def test_split_metadata(self):
+ schema = Schema(self.simple_pa_schema)
+ self.catalog.create_table('default.test_split_metadata', schema, False)
+ table = self.catalog.get_table('default.test_split_metadata')
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+ data = {
+ 'f0': [1, 2, 3, 4, 5],
+ 'f1': ['a', 'b', 'c', 'd', 'e'],
+ }
+ pa_table = pa.Table.from_pydict(data, schema=self.simple_pa_schema)
+ table_write.write_arrow(pa_table)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ splits = table_scan.plan().splits()
+
+ self.assertEqual(len(splits), 1)
+ self.assertEqual(len(splits[0].file_paths()), 1)
+ self.assertEqual(splits[0].row_count(), 5)
+ self.assertTrue(splits[0].file_paths()[0].endswith('.parquet'))
+ self.assertEqual(splits[0].file_size(),
os.path.getsize(splits[0].file_paths()[0]))