This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/paimon-python.git
The following commit(s) were added to refs/heads/main by this push:
new af06c70 #46 Improve Readability of TableRead Impletation (#47)
af06c70 is described below
commit af06c70b65a27de6163b577c0ae9e38eeb87954a
Author: ChengHui Chen <[email protected]>
AuthorDate: Thu Mar 20 14:59:57 2025 +0800
#46 Improve Readability of TableRead Impletation (#47)
---
pypaimon/py4j/java_implementation.py | 27 +++++++++------------------
1 file changed, 9 insertions(+), 18 deletions(-)
diff --git a/pypaimon/py4j/java_implementation.py
b/pypaimon/py4j/java_implementation.py
index 9a13037..6801c71 100644
--- a/pypaimon/py4j/java_implementation.py
+++ b/pypaimon/py4j/java_implementation.py
@@ -186,18 +186,15 @@ class Split(split.Split):
class TableRead(table_read.TableRead):
def __init__(self, j_table_read, j_read_type, catalog_options):
- self._j_table_read = j_table_read
- self._j_read_type = j_read_type
- self._catalog_options = catalog_options
- self._j_bytes_reader = None
self._arrow_schema = java_utils.to_arrow_schema(j_read_type)
+ self._j_bytes_reader =
get_gateway().jvm.InvocationUtil.createParallelBytesReader(
+ j_table_read, j_read_type,
TableRead._get_max_workers(catalog_options))
def to_arrow(self, splits):
record_batch_reader = self.to_arrow_batch_reader(splits)
return pa.Table.from_batches(record_batch_reader,
schema=self._arrow_schema)
def to_arrow_batch_reader(self, splits):
- self._init()
j_splits = list(map(lambda s: s.to_j_split(), splits))
self._j_bytes_reader.setSplits(j_splits)
batch_iterator = self._batch_generator()
@@ -222,19 +219,13 @@ class TableRead(table_read.TableRead):
return ray.data.from_arrow(self.to_arrow(splits))
- def _init(self):
- if self._j_bytes_reader is None:
- # get thread num
- max_workers = self._catalog_options.get(constants.MAX_WORKERS)
- if max_workers is None:
- # default is sequential
- max_workers = 1
- else:
- max_workers = int(max_workers)
- if max_workers <= 0:
- raise ValueError("max_workers must be greater than 0")
- self._j_bytes_reader =
get_gateway().jvm.InvocationUtil.createParallelBytesReader(
- self._j_table_read, self._j_read_type, max_workers)
+ @staticmethod
+ def _get_max_workers(catalog_options):
+ # default is sequential
+ max_workers = int(catalog_options.get(constants.MAX_WORKERS, 1))
+ if max_workers <= 0:
+ raise ValueError("max_workers must be greater than 0")
+ return max_workers
def _batch_generator(self) -> Iterator[pa.RecordBatch]:
while True: