JingsongLi commented on code in PR #6194:
URL: https://github.com/apache/paimon/pull/6194#discussion_r2321437542
##########
paimon-python/pypaimon/common/predicate.py:
##########
@@ -206,13 +205,41 @@ def to_arrow(self) -> pyarrow_compute.Expression | bool:
return ~pyarrow_dataset.field(self.field).isin(self.literals)
elif self.method == 'startsWith':
pattern = self.literals[0]
- return
pyarrow_compute.starts_with(pyarrow_dataset.field(self.field).cast(pyarrow.string()),
pattern)
+ # For PyArrow compatibility - improved approach
+ try:
+ field_ref = pyarrow_dataset.field(self.field)
+ # Ensure the field is cast to string type
+ string_field = field_ref.cast(pyarrow.string())
+ result = pyarrow_compute.starts_with(string_field, pattern)
+ return result
+ except Exception:
+ # Fallback to Python filtering - create a condition that
allows all rows
+ # to be processed by Python filter later
+ return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
Review Comment:
return true
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -288,36 +287,39 @@ def read_overwritten_file_utf8(self, path: Path) ->
Optional[str]:
def write_parquet(self, path: Path, data: pyarrow.RecordBatch,
compression: str = 'snappy', **kwargs):
try:
import pyarrow.parquet as pq
+ import pyarrow as pa
+ table = pa.Table.from_batches([data])
with self.new_output_stream(path) as output_stream:
- with pq.ParquetWriter(output_stream, data.schema,
compression=compression, **kwargs) as pw:
- pw.write_batch(data)
+ pq.write_table(table, output_stream, compression=compression,
**kwargs)
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Parquet file {path}: {e}")
from e
- def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression:
str = 'zstd', **kwargs):
+ def write_orc(self, path: Path, table: pyarrow.RecordBatch, compression:
str = 'zstd', **kwargs):
try:
+ """Write ORC file using PyArrow ORC writer."""
+ import pyarrow as pa
import pyarrow.orc as orc
- table = pyarrow.Table.from_batches([data])
+ table = pa.Table.from_batches([table])
+
with self.new_output_stream(path) as output_stream:
- orc.write_table(
- table,
- output_stream,
- compression=compression,
- **kwargs
- )
+ orc.write_table(table, output_stream, **kwargs)
Review Comment:
if python is 3.8, we should add compression.
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -288,36 +287,39 @@ def read_overwritten_file_utf8(self, path: Path) ->
Optional[str]:
def write_parquet(self, path: Path, data: pyarrow.RecordBatch,
compression: str = 'snappy', **kwargs):
try:
import pyarrow.parquet as pq
+ import pyarrow as pa
+ table = pa.Table.from_batches([data])
with self.new_output_stream(path) as output_stream:
- with pq.ParquetWriter(output_stream, data.schema,
compression=compression, **kwargs) as pw:
- pw.write_batch(data)
+ pq.write_table(table, output_stream, compression=compression,
**kwargs)
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Parquet file {path}: {e}")
from e
- def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression:
str = 'zstd', **kwargs):
+ def write_orc(self, path: Path, table: pyarrow.RecordBatch, compression:
str = 'zstd', **kwargs):
try:
+ """Write ORC file using PyArrow ORC writer."""
+ import pyarrow as pa
import pyarrow.orc as orc
- table = pyarrow.Table.from_batches([data])
+ table = pa.Table.from_batches([table])
+
with self.new_output_stream(path) as output_stream:
- orc.write_table(
- table,
- output_stream,
- compression=compression,
- **kwargs
- )
+ orc.write_table(table, output_stream, **kwargs)
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write ORC file {path}: {e}") from e
def write_avro(self, path: Path, data: pyarrow.RecordBatch, avro_schema:
Optional[Dict[str, Any]] = None, **kwargs):
import fastavro
-
if avro_schema is None:
+ from pypaimon.schema.data_types import PyarrowFieldParser
avro_schema = PyarrowFieldParser.to_avro_schema(data.schema)
- records = data.to_pylist()
+
+ records_dict = data.to_pydict()
+ records = [{col: records_dict[col][i] for col in records_dict.keys()}
Review Comment:
You should implement a `RecordBatchIterable`.
##########
paimon-python/pypaimon/common/predicate.py:
##########
@@ -206,13 +205,41 @@ def to_arrow(self) -> pyarrow_compute.Expression | bool:
return ~pyarrow_dataset.field(self.field).isin(self.literals)
elif self.method == 'startsWith':
pattern = self.literals[0]
- return
pyarrow_compute.starts_with(pyarrow_dataset.field(self.field).cast(pyarrow.string()),
pattern)
+ # For PyArrow compatibility - improved approach
+ try:
+ field_ref = pyarrow_dataset.field(self.field)
+ # Ensure the field is cast to string type
+ string_field = field_ref.cast(pyarrow.string())
+ result = pyarrow_compute.starts_with(string_field, pattern)
+ return result
+ except Exception:
+ # Fallback to Python filtering - create a condition that
allows all rows
+ # to be processed by Python filter later
+ return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
elif self.method == 'endsWith':
pattern = self.literals[0]
- return
pyarrow_compute.ends_with(pyarrow_dataset.field(self.field).cast(pyarrow.string()),
pattern)
+ # For PyArrow compatibility
+ try:
+ field_ref = pyarrow_dataset.field(self.field)
+ # Ensure the field is cast to string type
+ string_field = field_ref.cast(pyarrow.string())
+ result = pyarrow_compute.ends_with(string_field, pattern)
+ return result
+ except Exception:
+ # Fallback to Python filtering
+ return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
elif self.method == 'contains':
pattern = self.literals[0]
- return
pyarrow_compute.match_substring(pyarrow_dataset.field(self.field).cast(pyarrow.string()),
pattern)
+ # For PyArrow compatibility
+ try:
+ field_ref = pyarrow_dataset.field(self.field)
+ # Ensure the field is cast to string type
+ string_field = field_ref.cast(pyarrow.string())
+ result = pyarrow_compute.match_substring(string_field, pattern)
+ return result
+ except Exception:
+ # Fallback to Python filtering
+ return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
Review Comment:
return true
##########
paimon-python/pypaimon/read/reader/iface/record_batch_reader.py:
##########
@@ -45,19 +45,45 @@ def _read_next_df(self) -> Optional[polars.DataFrame]:
arrow_batch = self.read_arrow_batch()
if arrow_batch is None:
return None
- return polars.from_arrow(arrow_batch)
+ # Convert RecordBatch to Table for Polars compatibility with PyArrow
5.0.0
+ import pyarrow as pa
+ if hasattr(arrow_batch, 'num_rows'):
Review Comment:
Why changing here?
##########
paimon-python/pypaimon/read/reader/data_file_record_reader.py:
##########
@@ -45,6 +49,9 @@ def read_arrow_batch(self) -> Optional[RecordBatch]:
return None
if self.partition_info is None and self.index_mapping is None:
+ # Still need to apply Python-level filtering even when no
partition/mapping is needed
+ if self.python_predicate is not None:
+ record_batch = self._python_filter(record_batch)
Review Comment:
No need to filter here.
##########
paimon-python/pypaimon/common/predicate.py:
##########
@@ -206,13 +205,41 @@ def to_arrow(self) -> pyarrow_compute.Expression | bool:
return ~pyarrow_dataset.field(self.field).isin(self.literals)
elif self.method == 'startsWith':
pattern = self.literals[0]
- return
pyarrow_compute.starts_with(pyarrow_dataset.field(self.field).cast(pyarrow.string()),
pattern)
+ # For PyArrow compatibility - improved approach
+ try:
+ field_ref = pyarrow_dataset.field(self.field)
+ # Ensure the field is cast to string type
+ string_field = field_ref.cast(pyarrow.string())
+ result = pyarrow_compute.starts_with(string_field, pattern)
+ return result
+ except Exception:
+ # Fallback to Python filtering - create a condition that
allows all rows
+ # to be processed by Python filter later
+ return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
elif self.method == 'endsWith':
pattern = self.literals[0]
- return
pyarrow_compute.ends_with(pyarrow_dataset.field(self.field).cast(pyarrow.string()),
pattern)
+ # For PyArrow compatibility
+ try:
+ field_ref = pyarrow_dataset.field(self.field)
+ # Ensure the field is cast to string type
+ string_field = field_ref.cast(pyarrow.string())
+ result = pyarrow_compute.ends_with(string_field, pattern)
+ return result
+ except Exception:
+ # Fallback to Python filtering
+ return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
Review Comment:
return true
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]