This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 624bfda094 [python] Support reading and writing append table under
python 3.6 (#6194)
624bfda094 is described below
commit 624bfda094f3265be9bc0ac652182fcf364ec0fc
Author: umi <[email protected]>
AuthorDate: Thu Sep 4 21:52:09 2025 +0800
[python] Support reading and writing append table under python 3.6 (#6194)
---
paimon-python/dev/lint-python.sh | 19 +-
paimon-python/pypaimon/common/file_io.py | 37 ++-
paimon-python/pypaimon/common/json_util.py | 14 +-
paimon-python/pypaimon/common/predicate.py | 36 ++-
paimon-python/pypaimon/read/push_down_utils.py | 3 +-
.../pypaimon/read/reader/format_avro_reader.py | 5 +-
.../pypaimon/read/reader/format_pyarrow_reader.py | 6 +-
paimon-python/pypaimon/read/table_read.py | 9 +-
paimon-python/pypaimon/tests/py36/__init__.py | 17 ++
.../pypaimon/tests/py36/ao_predicate_test.py | 247 +++++++++++++++++++++
.../pypaimon/tests/py36/ao_read_write_test.py | 115 ++++++++++
.../pypaimon/tests/py36/pyarrow_compat.py | 40 ++++
paimon-python/pypaimon/tests/rest_server.py | 2 +-
paimon-python/pypaimon/write/writer/data_writer.py | 8 +-
14 files changed, 514 insertions(+), 44 deletions(-)
diff --git a/paimon-python/dev/lint-python.sh b/paimon-python/dev/lint-python.sh
index ae214d35f6..9cbb612137 100755
--- a/paimon-python/dev/lint-python.sh
+++ b/paimon-python/dev/lint-python.sh
@@ -162,16 +162,28 @@ function flake8_check() {
# Pytest check
function pytest_check() {
-
print_function "STAGE" "pytest checks"
if [ ! -f "$PYTEST_PATH" ]; then
echo "For some unknown reasons, the pytest package is not complete."
fi
+ # Get Python version
+ PYTHON_VERSION=$(python -c "import sys;
print(f'{sys.version_info.major}.{sys.version_info.minor}')")
+ echo "Detected Python version: $PYTHON_VERSION"
+
+ # Determine test directory based on Python version
+ if [ "$PYTHON_VERSION" = "3.6" ]; then
+ TEST_DIR="pypaimon/tests/py36"
+ echo "Running tests for Python 3.6: $TEST_DIR"
+ else
+ TEST_DIR="pypaimon/tests --ignore=pypaimon/tests/py36"
+ echo "Running tests for Python $PYTHON_VERSION (excluding py36):
pypaimon/tests --ignore=pypaimon/tests/py36"
+ fi
+
# the return value of a pipeline is the status of the last command to exit
# with a non-zero status or zero if no command exited with a non-zero
status
set -o pipefail
- ($PYTEST_PATH) 2>&1 | tee -a $LOG_FILE
+ ($PYTEST_PATH $TEST_DIR) 2>&1 | tee -a $LOG_FILE
PYCODESTYLE_STATUS=$?
if [ $PYCODESTYLE_STATUS -ne 0 ]; then
@@ -264,5 +276,4 @@ done
# collect checks according to the options
collect_checks
# run checks
-check_stage
-
+check_stage
\ No newline at end of file
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index f1b40e2239..8830a6a537 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -27,7 +27,6 @@ import pyarrow
from pyarrow._fs import FileSystem
from pypaimon.common.config import OssOptions, S3Options
-from pypaimon.schema.data_types import PyarrowFieldParser
class FileIO:
@@ -288,10 +287,10 @@ class FileIO:
def write_parquet(self, path: Path, data: pyarrow.RecordBatch,
compression: str = 'snappy', **kwargs):
try:
import pyarrow.parquet as pq
+ table = pyarrow.Table.from_batches([data])
with self.new_output_stream(path) as output_stream:
- with pq.ParquetWriter(output_stream, data.schema,
compression=compression, **kwargs) as pw:
- pw.write_batch(data)
+ pq.write_table(table, output_stream, compression=compression,
**kwargs)
except Exception as e:
self.delete_quietly(path)
@@ -299,15 +298,22 @@ class FileIO:
def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression:
str = 'zstd', **kwargs):
try:
+ """Write ORC file using PyArrow ORC writer."""
+ import sys
import pyarrow.orc as orc
table = pyarrow.Table.from_batches([data])
+
with self.new_output_stream(path) as output_stream:
- orc.write_table(
- table,
- output_stream,
- compression=compression,
- **kwargs
- )
+ # Check Python version - if 3.6, don't use compression
parameter
+ if sys.version_info[:2] == (3, 6):
+ orc.write_table(table, output_stream, **kwargs)
+ else:
+ orc.write_table(
+ table,
+ output_stream,
+ compression=compression,
+ **kwargs
+ )
except Exception as e:
self.delete_quietly(path)
@@ -315,9 +321,18 @@ class FileIO:
def write_avro(self, path: Path, data: pyarrow.RecordBatch, avro_schema:
Optional[Dict[str, Any]] = None, **kwargs):
import fastavro
-
if avro_schema is None:
+ from pypaimon.schema.data_types import PyarrowFieldParser
avro_schema = PyarrowFieldParser.to_avro_schema(data.schema)
- records = data.to_pylist()
+
+ records_dict = data.to_pydict()
+
+ def record_generator():
+ num_rows = len(list(records_dict.values())[0])
+ for i in range(num_rows):
+ yield {col: records_dict[col][i] for col in
records_dict.keys()}
+
+ records = record_generator()
+
with self.new_output_stream(path) as output_stream:
fastavro.writer(output_stream, avro_schema, records, **kwargs)
diff --git a/paimon-python/pypaimon/common/json_util.py
b/paimon-python/pypaimon/common/json_util.py
index 2f007cd230..5f4d513a27 100644
--- a/paimon-python/pypaimon/common/json_util.py
+++ b/paimon-python/pypaimon/common/json_util.py
@@ -17,7 +17,7 @@
import json
from dataclasses import field, fields, is_dataclass
-from typing import Any, Dict, Type, TypeVar, Union, get_args, get_origin
+from typing import Any, Dict, Type, TypeVar, Union, List
T = TypeVar("T")
@@ -86,14 +86,14 @@ class JSON:
for field_info in fields(target_class):
json_name = field_info.metadata.get("json_name", field_info.name)
field_mapping[json_name] = field_info.name
- origin_type = get_origin(field_info.type)
- args = get_args(field_info.type)
+ origin_type = getattr(field_info.type, '__origin__', None)
+ args = getattr(field_info.type, '__args__', None)
field_type = field_info.type
if origin_type is Union and len(args) == 2:
field_type = args[0]
if is_dataclass(field_type):
type_mapping[json_name] = field_type
- elif origin_type is list and is_dataclass(args[0]):
+ elif origin_type in (list, List) and is_dataclass(args[0]):
type_mapping[json_name] = field_info.type
# Map JSON data to field names
@@ -102,9 +102,9 @@ class JSON:
if json_name in field_mapping:
field_name = field_mapping[json_name]
if json_name in type_mapping:
- tp = get_origin(type_mapping[json_name])
- if tp is list:
- item_type = get_args(type_mapping[json_name])[0]
+ tp = getattr(type_mapping[json_name], '__origin__', None)
+ if tp in (list, List):
+ item_type = getattr(type_mapping[json_name],
'__args__', None)[0]
if is_dataclass(item_type):
kwargs[field_name] = [
item_type.from_dict(item)
diff --git a/paimon-python/pypaimon/common/predicate.py
b/paimon-python/pypaimon/common/predicate.py
index ba56713032..ce4172148c 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -15,7 +15,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from __future__ import annotations
from dataclasses import dataclass
from functools import reduce
@@ -183,7 +182,7 @@ class Predicate:
else:
raise ValueError("Unsupported predicate method:
{}".format(self.method))
- def to_arrow(self) -> pyarrow_compute.Expression | bool:
+ def to_arrow(self) -> Any:
if self.method == 'equal':
return pyarrow_dataset.field(self.field) == self.literals[0]
elif self.method == 'notEqual':
@@ -206,13 +205,40 @@ class Predicate:
return ~pyarrow_dataset.field(self.field).isin(self.literals)
elif self.method == 'startsWith':
pattern = self.literals[0]
- return
pyarrow_compute.starts_with(pyarrow_dataset.field(self.field).cast(pyarrow.string()),
pattern)
+ # For PyArrow compatibility - improved approach
+ try:
+ field_ref = pyarrow_dataset.field(self.field)
+ # Ensure the field is cast to string type
+ string_field = field_ref.cast(pyarrow.string())
+ result = pyarrow_compute.starts_with(string_field, pattern)
+ return result
+ except Exception:
+ # Fallback to True
+ return True
elif self.method == 'endsWith':
pattern = self.literals[0]
- return
pyarrow_compute.ends_with(pyarrow_dataset.field(self.field).cast(pyarrow.string()),
pattern)
+ # For PyArrow compatibility
+ try:
+ field_ref = pyarrow_dataset.field(self.field)
+ # Ensure the field is cast to string type
+ string_field = field_ref.cast(pyarrow.string())
+ result = pyarrow_compute.ends_with(string_field, pattern)
+ return result
+ except Exception:
+ # Fallback to True
+ return True
elif self.method == 'contains':
pattern = self.literals[0]
- return
pyarrow_compute.match_substring(pyarrow_dataset.field(self.field).cast(pyarrow.string()),
pattern)
+ # For PyArrow compatibility
+ try:
+ field_ref = pyarrow_dataset.field(self.field)
+ # Ensure the field is cast to string type
+ string_field = field_ref.cast(pyarrow.string())
+ result = pyarrow_compute.match_substring(string_field, pattern)
+ return result
+ except Exception:
+ # Fallback to True
+ return True
elif self.method == 'between':
return (pyarrow_dataset.field(self.field) >= self.literals[0]) & \
(pyarrow_dataset.field(self.field) <= self.literals[1])
diff --git a/paimon-python/pypaimon/read/push_down_utils.py
b/paimon-python/pypaimon/read/push_down_utils.py
index 31e66973c6..95a99d9005 100644
--- a/paimon-python/pypaimon/read/push_down_utils.py
+++ b/paimon-python/pypaimon/read/push_down_utils.py
@@ -64,7 +64,8 @@ def extract_predicate_to_dict(result: Dict, input_predicate:
'Predicate', keys:
return
# condition: only one key for 'or', and the key belongs to keys
involved_fields = {p.field for p in input_predicate.literals}
- if len(involved_fields) == 1 and (field := involved_fields.pop()) in
keys:
+ field = involved_fields.pop() if len(involved_fields) == 1 else None
+ if field is not None and field in keys:
result[field].append(input_predicate)
return
diff --git a/paimon-python/pypaimon/read/reader/format_avro_reader.py
b/paimon-python/pypaimon/read/reader/format_avro_reader.py
index 4ce7c04ed4..a9ca01a6e5 100644
--- a/paimon-python/pypaimon/read/reader/format_avro_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_avro_reader.py
@@ -16,11 +16,10 @@
# limitations under the License.
################################################################################
-from typing import List, Optional
+from typing import List, Optional, Any
import fastavro
import pyarrow as pa
-import pyarrow.compute as pc
import pyarrow.dataset as ds
from pyarrow import RecordBatch
@@ -36,7 +35,7 @@ class FormatAvroReader(RecordBatchReader):
"""
def __init__(self, file_io: FileIO, file_path: str, read_fields:
List[str], full_fields: List[DataField],
- push_down_predicate: pc.Expression | bool, batch_size: int =
4096):
+ push_down_predicate: Any, batch_size: int = 4096):
self._file = file_io.filesystem.open_input_file(file_path)
self._avro_reader = fastavro.reader(self._file)
self._batch_size = batch_size
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index ecef589391..d6c403b0f9 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -16,9 +16,8 @@
# limitations under the License.
################################################################################
-from typing import List, Optional
+from typing import List, Optional, Any
-import pyarrow.compute as pc
import pyarrow.dataset as ds
from pyarrow import RecordBatch
@@ -33,7 +32,7 @@ class FormatPyArrowReader(RecordBatchReader):
"""
def __init__(self, file_io: FileIO, file_format: str, file_path: str,
read_fields: List[str],
- push_down_predicate: pc.Expression | bool, batch_size: int =
4096):
+ push_down_predicate: Any, batch_size: int = 4096):
self.dataset = ds.dataset(file_path, format=file_format,
filesystem=file_io.filesystem)
self.reader = self.dataset.scanner(
columns=read_fields,
@@ -49,5 +48,4 @@ class FormatPyArrowReader(RecordBatchReader):
def close(self):
if self.reader is not None:
- self.reader.close()
self.reader = None
diff --git a/paimon-python/pypaimon/read/table_read.py
b/paimon-python/pypaimon/read/table_read.py
index 621549d832..b5f7a7b765 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -15,11 +15,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
-from typing import Iterator, List, Optional
+from typing import Iterator, List, Optional, Any
import pandas
import pyarrow
-import pyarrow.compute as pc
from pypaimon.common.predicate import Predicate
from pypaimon.common.predicate_builder import PredicateBuilder
@@ -55,10 +54,10 @@ class TableRead:
return _record_generator()
- def to_arrow_batch_reader(self, splits: List[Split]) ->
pyarrow.RecordBatchReader:
+ def to_arrow_batch_reader(self, splits: List[Split]) ->
pyarrow.ipc.RecordBatchReader:
schema = PyarrowFieldParser.from_paimon_schema(self.read_type)
batch_iterator = self._arrow_batch_generator(splits, schema)
- return pyarrow.RecordBatchReader.from_batches(schema, batch_iterator)
+ return pyarrow.ipc.RecordBatchReader.from_batches(schema,
batch_iterator)
def to_arrow(self, splits: List[Split]) -> Optional[pyarrow.Table]:
batch_reader = self.to_arrow_batch_reader(splits)
@@ -109,7 +108,7 @@ class TableRead:
return ray.data.from_arrow(self.to_arrow(splits))
- def _push_down_predicate(self) -> pc.Expression | bool:
+ def _push_down_predicate(self) -> Any:
if self.predicate is None:
return None
elif self.table.is_primary_key_table:
diff --git a/paimon-python/pypaimon/tests/py36/__init__.py
b/paimon-python/pypaimon/tests/py36/__init__.py
new file mode 100644
index 0000000000..53ed4d36c2
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py36/__init__.py
@@ -0,0 +1,17 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
diff --git a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
new file mode 100644
index 0000000000..ffa305240b
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
@@ -0,0 +1,247 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import tempfile
+import unittest
+
+import pandas as pd
+import pyarrow as pa
+
+from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.schema.schema import Schema
+from pypaimon.tests.predicates_test import _random_format,
_check_filtered_result
+
+
+class PredicatePy36Test(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.tempdir = tempfile.mkdtemp()
+ cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+ cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+ cls.catalog.create_database('default', False)
+ pa_schema = pa.schema([
+ ('f0', pa.int64()),
+ ('f1', pa.string()),
+ ])
+ cls.catalog.create_table('default.test_append',
Schema.from_pyarrow_schema(
+ pa_schema, options={'file.format': _random_format()}), False)
+
+ df = pd.DataFrame({
+ 'f0': [1, 2, 3, 4, 5],
+ 'f1': ['abc', 'abbc', 'bc', 'd', None],
+ })
+
+ append_table = cls.catalog.get_table('default.test_append')
+ write_builder = append_table.new_batch_write_builder()
+ write = write_builder.new_write()
+ commit = write_builder.new_commit()
+ write.write_pandas(df)
+ commit.commit(write.prepare_commit())
+ write.close()
+ commit.close()
+
+ cls.df = df
+
+ def testWrongFieldName(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ with self.assertRaises(ValueError) as e:
+ predicate_builder.equal('f2', 'a')
+ self.assertEqual(str(e.exception), "The field f2 is not in field list
['f0', 'f1'].")
+
+ def testAppendWithDuplicate(self):
+ pa_schema = pa.schema([
+ ('f0', pa.int64()),
+ ('f1', pa.string()),
+ ])
+ self.catalog.create_table('default.test_append_with_duplicate',
Schema.from_pyarrow_schema(pa_schema), False)
+
+ df = pd.DataFrame({
+ 'f0': [1, 1, 2, 2],
+ 'f1': ['a', 'b', 'c', 'd'],
+ })
+
+ table = self.catalog.get_table('default.test_append_with_duplicate')
+ write_builder = table.new_batch_write_builder()
+ write = write_builder.new_write()
+ commit = write_builder.new_commit()
+ write.write_pandas(df)
+ commit.commit(write.prepare_commit())
+ write.close()
+ commit.close()
+
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+
+ predicate = predicate_builder.equal('f0', 1)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[0:1])
+
+ predicate = predicate_builder.equal('f0', 0)
+ read_builder = table.new_read_builder().with_filter(predicate)
+ scan = read_builder.new_scan()
+ read = read_builder.new_read()
+ actual_df = read.to_pandas(scan.plan().splits())
+ self.assertEqual(len(actual_df), 0)
+
+ def testAllFieldTypesWithEqual(self):
+ pa_schema = pa.schema([
+ # int
+ ('_tinyint', pa.int8()),
+ ('_smallint', pa.int16()),
+ ('_int', pa.int32()),
+ ('_bigint', pa.int64()),
+ # float
+ ('_float16', pa.float32()), # NOTE: cannot write pa.float16()
data into Paimon
+ ('_float32', pa.float32()),
+ ('_double', pa.float64()),
+ # string
+ ('_string', pa.string()),
+ # bool
+ ('_boolean', pa.bool_())
+ ])
+ self.catalog.create_table('default.test_all_field_types',
+ Schema.from_pyarrow_schema(pa_schema,
options={'file.format': _random_format()}),
+ False)
+ table = self.catalog.get_table('default.test_all_field_types')
+ write_builder = table.new_batch_write_builder()
+ write = write_builder.new_write()
+ commit = write_builder.new_commit()
+
+ df = pd.DataFrame({
+ '_tinyint': pd.Series([1, 2], dtype='int8'),
+ '_smallint': pd.Series([10, 20], dtype='int16'),
+ '_int': pd.Series([100, 200], dtype='int32'),
+ '_bigint': pd.Series([1000, 2000], dtype='int64'),
+ '_float16': pd.Series([1.0, 2.0], dtype='float32'),
+ '_float32': pd.Series([1.00, 2.00], dtype='float32'),
+ '_double': pd.Series([1.000, 2.000], dtype='double'),
+ '_string': pd.Series(['A', 'B'], dtype='object'),
+ '_boolean': [True, False]
+ })
+ record_batch = pa.RecordBatch.from_pandas(df, schema=pa_schema)
+
+ write.write_arrow_batch(record_batch)
+ commit.commit(write.prepare_commit())
+ write.close()
+ commit.close()
+
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+
+ predicate = predicate_builder.equal('_tinyint', 1)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[0]])
+
+ predicate = predicate_builder.equal('_smallint', 20)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[1]])
+
+ predicate = predicate_builder.equal('_int', 100)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[0]])
+
+ predicate = predicate_builder.equal('_bigint', 2000)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[1]])
+
+ predicate = predicate_builder.equal('_float16', 1.0)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[0]])
+
+ predicate = predicate_builder.equal('_float32', 2.00)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[1]])
+
+ predicate = predicate_builder.equal('_double', 1.000)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[0]])
+
+ predicate = predicate_builder.equal('_string', 'B')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[1]])
+
+ predicate = predicate_builder.equal('_boolean', True)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
df.loc[[0]])
+
+ def testNotEqualAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.not_equal('f0', 1)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[1:4])
+
+ def testLessThanAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.less_than('f0', 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:1])
+
+ def testLessOrEqualAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.less_or_equal('f0', 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:2])
+
+ def testGreaterThanAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.greater_than('f0', 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[3:4])
+
+ def testGreaterOrEqualAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.greater_or_equal('f0', 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[2:4])
+
+ def testIsNullAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.is_null('f1')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[[4]])
+
+ def testIsNotNullAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.is_not_null('f1')
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:3])
+
+ def testIsInAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.is_in('f0', [1, 2])
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:1])
+
+ def testIsNotInAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.is_not_in('f0', [1, 2])
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[2:4])
+
+ def testBetweenAppend(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate = predicate_builder.between('f0', 1, 3)
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[0:2])
+
+ def testAndPredicates(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate1 = predicate_builder.greater_than('f0', 1)
+ predicate2 = predicate_builder.less_than('f0', 4)
+ predicate = predicate_builder.and_predicates([predicate1, predicate2])
+
_check_filtered_result(table.new_read_builder().with_filter(predicate),
self.df.loc[1:2])
+
+ def testOrPredicates(self):
+ table = self.catalog.get_table('default.test_append')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ predicate1 = predicate_builder.greater_than('f0', 3)
+ predicate2 = predicate_builder.less_than('f0', 2)
+ predicate = predicate_builder.or_predicates([predicate1, predicate2])
+ _check_filtered_result(table.new_read_builder().with_filter(predicate),
+ self.df.loc[[0, 3, 4]])
diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
new file mode 100644
index 0000000000..e27b681e90
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
@@ -0,0 +1,115 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import pyarrow as pa
+
+from pypaimon.tests.py36.pyarrow_compat import table_sort_by
+from pypaimon.schema.schema import Schema
+from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest
+
+
+class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
+
+ def testParquetAppendOnlyReader(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+ self.rest_catalog.create_table('default.test_append_only_parquet',
schema, False)
+ table = self.rest_catalog.get_table('default.test_append_only_parquet')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder()
+ actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+ self.assertEqual(actual, self.expected)
+
+ def testOrcAppendOnlyReader(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'orc'})
+ self.rest_catalog.create_table('default.test_append_only_orc', schema,
False)
+ table = self.rest_catalog.get_table('default.test_append_only_orc')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder()
+ actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+ self.assertEqual(actual, self.expected)
+
+ def testAvroAppendOnlyReader(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'avro'})
+ self.rest_catalog.create_table('default.test_append_only_avro',
schema, False)
+ table = self.rest_catalog.get_table('default.test_append_only_avro')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder()
+ actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+ self.assertEqual(actual, self.expected)
+
+ def testAppendOnlyReaderWithFilter(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+ self.rest_catalog.create_table('default.test_append_only_filter',
schema, False)
+ table = self.rest_catalog.get_table('default.test_append_only_filter')
+ self._write_test_table(table)
+
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ p1 = predicate_builder.less_than('user_id', 7)
+ p2 = predicate_builder.greater_or_equal('user_id', 2)
+ p3 = predicate_builder.between('user_id', 0, 6) # [2/b, 3/c, 4/d,
5/e, 6/f] left
+ p4 = predicate_builder.is_not_in('behavior', ['b', 'e']) # [3/c, 4/d,
6/f] left
+ p5 = predicate_builder.is_in('dt', ['p1']) # exclude 3/c
+ p6 = predicate_builder.is_not_null('behavior') # exclude 4/d
+ g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6])
+ read_builder = table.new_read_builder().with_filter(g1)
+ actual = self._read_test_table(read_builder)
+ expected = pa.concat_tables([
+ self.expected.slice(5, 1) # 6/f
+ ])
+ self.assertEqual(table_sort_by(actual, 'user_id'), expected)
+
+ p7 = predicate_builder.startswith('behavior', 'a')
+ p8 = predicate_builder.equal('item_id', 1002)
+ p9 = predicate_builder.is_null('behavior')
+ g2 = predicate_builder.or_predicates([p7, p8, p9])
+ read_builder = table.new_read_builder().with_filter(g2)
+ actual = self._read_test_table(read_builder)
+ expected = pa.concat_tables([
+ self.expected.slice(0, 1), # 1/a
+ self.expected.slice(1, 1), # 2/b
+ self.expected.slice(3, 1), # 5/e
+ ])
+ self.assertEqual(table_sort_by(actual, 'user_id'), expected)
+
+ # Same as java, 'not_equal' will also filter records of 'None' value
+ p12 = predicate_builder.not_equal('behavior', 'f')
+ read_builder = table.new_read_builder().with_filter(p12)
+ actual = self._read_test_table(read_builder)
+ expected = pa.concat_tables([
+ # not only 6/f, but also 4/d will be filtered
+ self.expected.slice(0, 1), # 1/a
+ self.expected.slice(1, 1), # 2/b
+ self.expected.slice(2, 1), # 3/c
+ self.expected.slice(4, 1), # 5/e
+ self.expected.slice(6, 1), # 7/g
+ self.expected.slice(7, 1), # 8/h
+ ])
+ self.assertEqual(table_sort_by(actual, 'user_id'), expected)
+
+ def testAppendOnlyReaderWithProjection(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+ self.rest_catalog.create_table('default.test_append_only_projection',
schema, False)
+ table =
self.rest_catalog.get_table('default.test_append_only_projection')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder().with_projection(['dt',
'user_id'])
+ actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+ expected = self.expected.select(['dt', 'user_id'])
+ self.assertEqual(actual, expected)
diff --git a/paimon-python/pypaimon/tests/py36/pyarrow_compat.py
b/paimon-python/pypaimon/tests/py36/pyarrow_compat.py
new file mode 100644
index 0000000000..d2cc40867a
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py36/pyarrow_compat.py
@@ -0,0 +1,40 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+"""
+PyArrow compatibility module for Python 3.6 and PyArrow 5.0.0
+"""
+
+import pyarrow as pa
+import pyarrow.compute as pc
+
+
+def sort_table_by_column(table: pa.Table, column_name: str, order: str =
'ascending') -> pa.Table:
+ """
+ Sort a PyArrow Table by a column name.
+ This function provides compatibility for PyArrow 5.0.0 which doesn't have
Table.sort_by method.
+ """
+ if hasattr(table, 'sort_by'):
+ # PyArrow >= 6.0 has native sort_by method
+ return table.sort_by(column_name)
+ else:
+ # PyArrow 5.0 compatibility using sort_indices and take
+ indices = pc.sort_indices(table, sort_keys=[(column_name, order)])
+ return table.take(indices)
+
+
+table_sort_by = sort_table_by_column
diff --git a/paimon-python/pypaimon/tests/rest_server.py
b/paimon-python/pypaimon/tests/rest_server.py
index 8804a4f5e2..1419faeb13 100644
--- a/paimon-python/pypaimon/tests/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest_server.py
@@ -717,7 +717,7 @@ class RESTCatalogServer:
"""Add no permission table"""
self.no_permission_tables.append(identifier.get_full_name())
- def mock_database(self, name: str, options: dict[str, str]) ->
GetDatabaseResponse:
+ def mock_database(self, name: str, options: Dict[str, str]) ->
GetDatabaseResponse:
return GetDatabaseResponse(
id=str(uuid.uuid4()),
name=name,
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py
b/paimon-python/pypaimon/write/writer/data_writer.py
index bc4553847c..4a197c6919 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -90,7 +90,7 @@ class DataWriter(ABC):
if self.pending_data is None:
return
- current_size = self.pending_data.get_total_buffer_size()
+ current_size = self.pending_data.nbytes
if current_size > self.target_file_size:
split_row = self._find_optimal_split_point(self.pending_data,
self.target_file_size)
if split_row > 0:
@@ -116,7 +116,9 @@ class DataWriter(ABC):
raise ValueError(f"Unsupported file format: {self.file_format}")
# min key & max key
- key_columns_batch = data.select(self.trimmed_primary_key)
+ table = pa.Table.from_batches([data])
+ selected_table = table.select(self.trimmed_primary_key)
+ key_columns_batch = selected_table.to_batches()[0]
min_key_row_batch = key_columns_batch.slice(0, 1)
max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows
- 1, 1)
min_key = [col.to_pylist()[0] for col in min_key_row_batch.columns]
@@ -192,7 +194,7 @@ class DataWriter(ABC):
while left <= right:
mid = (left + right) // 2
slice_data = data.slice(0, mid)
- slice_size = slice_data.get_total_buffer_size()
+ slice_size = slice_data.nbytes
if slice_size <= target_size:
best_split = mid