This is an automated email from the ASF dual-hosted git repository.

yuzelin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-python.git


The following commit(s) were added to refs/heads/main by this push:
     new af06c70  #46 Improve Readability of TableRead Impletation (#47)
af06c70 is described below

commit af06c70b65a27de6163b577c0ae9e38eeb87954a
Author: ChengHui Chen <[email protected]>
AuthorDate: Thu Mar 20 14:59:57 2025 +0800

    #46 Improve Readability of TableRead Impletation (#47)
---
 pypaimon/py4j/java_implementation.py | 27 +++++++++------------------
 1 file changed, 9 insertions(+), 18 deletions(-)

diff --git a/pypaimon/py4j/java_implementation.py 
b/pypaimon/py4j/java_implementation.py
index 9a13037..6801c71 100644
--- a/pypaimon/py4j/java_implementation.py
+++ b/pypaimon/py4j/java_implementation.py
@@ -186,18 +186,15 @@ class Split(split.Split):
 class TableRead(table_read.TableRead):
 
     def __init__(self, j_table_read, j_read_type, catalog_options):
-        self._j_table_read = j_table_read
-        self._j_read_type = j_read_type
-        self._catalog_options = catalog_options
-        self._j_bytes_reader = None
         self._arrow_schema = java_utils.to_arrow_schema(j_read_type)
+        self._j_bytes_reader = 
get_gateway().jvm.InvocationUtil.createParallelBytesReader(
+            j_table_read, j_read_type, 
TableRead._get_max_workers(catalog_options))
 
     def to_arrow(self, splits):
         record_batch_reader = self.to_arrow_batch_reader(splits)
         return pa.Table.from_batches(record_batch_reader, 
schema=self._arrow_schema)
 
     def to_arrow_batch_reader(self, splits):
-        self._init()
         j_splits = list(map(lambda s: s.to_j_split(), splits))
         self._j_bytes_reader.setSplits(j_splits)
         batch_iterator = self._batch_generator()
@@ -222,19 +219,13 @@ class TableRead(table_read.TableRead):
 
         return ray.data.from_arrow(self.to_arrow(splits))
 
-    def _init(self):
-        if self._j_bytes_reader is None:
-            # get thread num
-            max_workers = self._catalog_options.get(constants.MAX_WORKERS)
-            if max_workers is None:
-                # default is sequential
-                max_workers = 1
-            else:
-                max_workers = int(max_workers)
-            if max_workers <= 0:
-                raise ValueError("max_workers must be greater than 0")
-            self._j_bytes_reader = 
get_gateway().jvm.InvocationUtil.createParallelBytesReader(
-                self._j_table_read, self._j_read_type, max_workers)
+    @staticmethod
+    def _get_max_workers(catalog_options):
+        # default is sequential
+        max_workers = int(catalog_options.get(constants.MAX_WORKERS, 1))
+        if max_workers <= 0:
+            raise ValueError("max_workers must be greater than 0")
+        return max_workers
 
     def _batch_generator(self) -> Iterator[pa.RecordBatch]:
         while True:

Reply via email to