JingsongLi commented on code in PR #6194:
URL: https://github.com/apache/paimon/pull/6194#discussion_r2321366912
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -288,36 +287,46 @@ def read_overwritten_file_utf8(self, path: Path) ->
Optional[str]:
def write_parquet(self, path: Path, data: pyarrow.RecordBatch,
compression: str = 'snappy', **kwargs):
try:
import pyarrow.parquet as pq
+ import pyarrow as pa
+ # Convert RecordBatch to Table if necessary for PyArrow 6.0.1
compatibility
+ if isinstance(data, pa.RecordBatch):
+ table = pa.Table.from_batches([data])
with self.new_output_stream(path) as output_stream:
- with pq.ParquetWriter(output_stream, data.schema,
compression=compression, **kwargs) as pw:
- pw.write_batch(data)
+ pq.write_table(table, output_stream, **kwargs)
Review Comment:
Need to pass compression.
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -288,36 +287,46 @@ def read_overwritten_file_utf8(self, path: Path) ->
Optional[str]:
def write_parquet(self, path: Path, data: pyarrow.RecordBatch,
compression: str = 'snappy', **kwargs):
try:
import pyarrow.parquet as pq
+ import pyarrow as pa
+ # Convert RecordBatch to Table if necessary for PyArrow 6.0.1
compatibility
+ if isinstance(data, pa.RecordBatch):
+ table = pa.Table.from_batches([data])
with self.new_output_stream(path) as output_stream:
- with pq.ParquetWriter(output_stream, data.schema,
compression=compression, **kwargs) as pw:
- pw.write_batch(data)
+ pq.write_table(table, output_stream, **kwargs)
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Parquet file {path}: {e}")
from e
- def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression:
str = 'zstd', **kwargs):
+ def write_orc(self, path: Path, table: pyarrow.RecordBatch, compression:
str = 'zstd', **kwargs):
try:
+ """Write ORC file using PyArrow ORC writer."""
+ import pyarrow as pa
import pyarrow.orc as orc
- table = pyarrow.Table.from_batches([data])
+ # Convert RecordBatch to Table if necessary
+ if isinstance(table, pa.RecordBatch):
+ table = pa.Table.from_batches([table])
+
with self.new_output_stream(path) as output_stream:
- orc.write_table(
- table,
- output_stream,
- compression=compression,
- **kwargs
- )
+ # PyArrow ORC writer doesn't support compression parameter
directly
+ # Remove compression from kwargs to avoid TypeError
+ orc_kwargs = {k: v for k, v in kwargs.items() if k !=
'compression'}
Review Comment:
How to deal with compression?
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -288,36 +287,46 @@ def read_overwritten_file_utf8(self, path: Path) ->
Optional[str]:
def write_parquet(self, path: Path, data: pyarrow.RecordBatch,
compression: str = 'snappy', **kwargs):
try:
import pyarrow.parquet as pq
+ import pyarrow as pa
+ # Convert RecordBatch to Table if necessary for PyArrow 6.0.1
compatibility
+ if isinstance(data, pa.RecordBatch):
Review Comment:
No need to isinstance.
##########
paimon-python/pypaimon/common/json_util.py:
##########
@@ -17,7 +17,18 @@
import json
from dataclasses import field, fields, is_dataclass
-from typing import Any, Dict, Type, TypeVar, Union, get_args, get_origin
+from typing import Any, Dict, Type, TypeVar, Union, List
+
+# Python 3.6 compatibility for get_args and get_origin
+try:
+ from typing import get_args, get_origin
+except ImportError:
+ # Fallback for Python < 3.8
+ def get_origin(tp):
Review Comment:
You can just use `__origin__ ` in the code.
##########
paimon-python/pypaimon/common/json_util.py:
##########
@@ -17,7 +17,18 @@
import json
from dataclasses import field, fields, is_dataclass
-from typing import Any, Dict, Type, TypeVar, Union, get_args, get_origin
+from typing import Any, Dict, Type, TypeVar, Union, List
+
+# Python 3.6 compatibility for get_args and get_origin
+try:
+ from typing import get_args, get_origin
+except ImportError:
+ # Fallback for Python < 3.8
+ def get_origin(tp):
+ return getattr(tp, '__origin__', None)
+
+ def get_args(tp):
+ return getattr(tp, '__args__', ())
Review Comment:
You can just use `__args__` in the code.
##########
paimon-python/pypaimon/common/file_io.py:
##########
@@ -288,36 +287,46 @@ def read_overwritten_file_utf8(self, path: Path) ->
Optional[str]:
def write_parquet(self, path: Path, data: pyarrow.RecordBatch,
compression: str = 'snappy', **kwargs):
try:
import pyarrow.parquet as pq
+ import pyarrow as pa
+ # Convert RecordBatch to Table if necessary for PyArrow 6.0.1
compatibility
+ if isinstance(data, pa.RecordBatch):
+ table = pa.Table.from_batches([data])
with self.new_output_stream(path) as output_stream:
- with pq.ParquetWriter(output_stream, data.schema,
compression=compression, **kwargs) as pw:
- pw.write_batch(data)
+ pq.write_table(table, output_stream, **kwargs)
except Exception as e:
self.delete_quietly(path)
raise RuntimeError(f"Failed to write Parquet file {path}: {e}")
from e
- def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression:
str = 'zstd', **kwargs):
+ def write_orc(self, path: Path, table: pyarrow.RecordBatch, compression:
str = 'zstd', **kwargs):
try:
+ """Write ORC file using PyArrow ORC writer."""
+ import pyarrow as pa
import pyarrow.orc as orc
- table = pyarrow.Table.from_batches([data])
+ # Convert RecordBatch to Table if necessary
+ if isinstance(table, pa.RecordBatch):
Review Comment:
Ditto
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]