This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 624bfda094 [python] Support reading and writing append table under 
python 3.6 (#6194)
624bfda094 is described below

commit 624bfda094f3265be9bc0ac652182fcf364ec0fc
Author: umi <[email protected]>
AuthorDate: Thu Sep 4 21:52:09 2025 +0800

    [python] Support reading and writing append table under python 3.6 (#6194)
---
 paimon-python/dev/lint-python.sh                   |  19 +-
 paimon-python/pypaimon/common/file_io.py           |  37 ++-
 paimon-python/pypaimon/common/json_util.py         |  14 +-
 paimon-python/pypaimon/common/predicate.py         |  36 ++-
 paimon-python/pypaimon/read/push_down_utils.py     |   3 +-
 .../pypaimon/read/reader/format_avro_reader.py     |   5 +-
 .../pypaimon/read/reader/format_pyarrow_reader.py  |   6 +-
 paimon-python/pypaimon/read/table_read.py          |   9 +-
 paimon-python/pypaimon/tests/py36/__init__.py      |  17 ++
 .../pypaimon/tests/py36/ao_predicate_test.py       | 247 +++++++++++++++++++++
 .../pypaimon/tests/py36/ao_read_write_test.py      | 115 ++++++++++
 .../pypaimon/tests/py36/pyarrow_compat.py          |  40 ++++
 paimon-python/pypaimon/tests/rest_server.py        |   2 +-
 paimon-python/pypaimon/write/writer/data_writer.py |   8 +-
 14 files changed, 514 insertions(+), 44 deletions(-)

diff --git a/paimon-python/dev/lint-python.sh b/paimon-python/dev/lint-python.sh
index ae214d35f6..9cbb612137 100755
--- a/paimon-python/dev/lint-python.sh
+++ b/paimon-python/dev/lint-python.sh
@@ -162,16 +162,28 @@ function flake8_check() {
 
 # Pytest check
 function pytest_check() {
-
     print_function "STAGE" "pytest checks"
     if [ ! -f "$PYTEST_PATH" ]; then
         echo "For some unknown reasons, the pytest package is not complete."
     fi
 
+    # Get Python version
+    PYTHON_VERSION=$(python -c "import sys; 
print(f'{sys.version_info.major}.{sys.version_info.minor}')")
+    echo "Detected Python version: $PYTHON_VERSION"
+
+    # Determine test directory based on Python version
+    if [ "$PYTHON_VERSION" = "3.6" ]; then
+        TEST_DIR="pypaimon/tests/py36"
+        echo "Running tests for Python 3.6: $TEST_DIR"
+    else
+        TEST_DIR="pypaimon/tests --ignore=pypaimon/tests/py36"
+        echo "Running tests for Python $PYTHON_VERSION (excluding py36): 
pypaimon/tests --ignore=pypaimon/tests/py36"
+    fi
+
     # the return value of a pipeline is the status of the last command to exit
     # with a non-zero status or zero if no command exited with a non-zero 
status
     set -o pipefail
-    ($PYTEST_PATH) 2>&1 | tee -a $LOG_FILE
+    ($PYTEST_PATH $TEST_DIR) 2>&1 | tee -a $LOG_FILE
 
     PYCODESTYLE_STATUS=$?
     if [ $PYCODESTYLE_STATUS -ne 0 ]; then
@@ -264,5 +276,4 @@ done
 # collect checks according to the options
 collect_checks
 # run checks
-check_stage
-
+check_stage
\ No newline at end of file
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index f1b40e2239..8830a6a537 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -27,7 +27,6 @@ import pyarrow
 from pyarrow._fs import FileSystem
 
 from pypaimon.common.config import OssOptions, S3Options
-from pypaimon.schema.data_types import PyarrowFieldParser
 
 
 class FileIO:
@@ -288,10 +287,10 @@ class FileIO:
     def write_parquet(self, path: Path, data: pyarrow.RecordBatch, 
compression: str = 'snappy', **kwargs):
         try:
             import pyarrow.parquet as pq
+            table = pyarrow.Table.from_batches([data])
 
             with self.new_output_stream(path) as output_stream:
-                with pq.ParquetWriter(output_stream, data.schema, 
compression=compression, **kwargs) as pw:
-                    pw.write_batch(data)
+                pq.write_table(table, output_stream, compression=compression, 
**kwargs)
 
         except Exception as e:
             self.delete_quietly(path)
@@ -299,15 +298,22 @@ class FileIO:
 
     def write_orc(self, path: Path, data: pyarrow.RecordBatch, compression: 
str = 'zstd', **kwargs):
         try:
+            """Write ORC file using PyArrow ORC writer."""
+            import sys
             import pyarrow.orc as orc
             table = pyarrow.Table.from_batches([data])
+
             with self.new_output_stream(path) as output_stream:
-                orc.write_table(
-                    table,
-                    output_stream,
-                    compression=compression,
-                    **kwargs
-                )
+                # Check Python version - if 3.6, don't use compression 
parameter
+                if sys.version_info[:2] == (3, 6):
+                    orc.write_table(table, output_stream, **kwargs)
+                else:
+                    orc.write_table(
+                        table,
+                        output_stream,
+                        compression=compression,
+                        **kwargs
+                    )
 
         except Exception as e:
             self.delete_quietly(path)
@@ -315,9 +321,18 @@ class FileIO:
 
     def write_avro(self, path: Path, data: pyarrow.RecordBatch, avro_schema: 
Optional[Dict[str, Any]] = None, **kwargs):
         import fastavro
-
         if avro_schema is None:
+            from pypaimon.schema.data_types import PyarrowFieldParser
             avro_schema = PyarrowFieldParser.to_avro_schema(data.schema)
-        records = data.to_pylist()
+
+        records_dict = data.to_pydict()
+
+        def record_generator():
+            num_rows = len(list(records_dict.values())[0])
+            for i in range(num_rows):
+                yield {col: records_dict[col][i] for col in 
records_dict.keys()}
+
+        records = record_generator()
+
         with self.new_output_stream(path) as output_stream:
             fastavro.writer(output_stream, avro_schema, records, **kwargs)
diff --git a/paimon-python/pypaimon/common/json_util.py 
b/paimon-python/pypaimon/common/json_util.py
index 2f007cd230..5f4d513a27 100644
--- a/paimon-python/pypaimon/common/json_util.py
+++ b/paimon-python/pypaimon/common/json_util.py
@@ -17,7 +17,7 @@
 
 import json
 from dataclasses import field, fields, is_dataclass
-from typing import Any, Dict, Type, TypeVar, Union, get_args, get_origin
+from typing import Any, Dict, Type, TypeVar, Union, List
 
 T = TypeVar("T")
 
@@ -86,14 +86,14 @@ class JSON:
         for field_info in fields(target_class):
             json_name = field_info.metadata.get("json_name", field_info.name)
             field_mapping[json_name] = field_info.name
-            origin_type = get_origin(field_info.type)
-            args = get_args(field_info.type)
+            origin_type = getattr(field_info.type, '__origin__', None)
+            args = getattr(field_info.type, '__args__', None)
             field_type = field_info.type
             if origin_type is Union and len(args) == 2:
                 field_type = args[0]
             if is_dataclass(field_type):
                 type_mapping[json_name] = field_type
-            elif origin_type is list and is_dataclass(args[0]):
+            elif origin_type in (list, List) and is_dataclass(args[0]):
                 type_mapping[json_name] = field_info.type
 
         # Map JSON data to field names
@@ -102,9 +102,9 @@ class JSON:
             if json_name in field_mapping:
                 field_name = field_mapping[json_name]
                 if json_name in type_mapping:
-                    tp = get_origin(type_mapping[json_name])
-                    if tp is list:
-                        item_type = get_args(type_mapping[json_name])[0]
+                    tp = getattr(type_mapping[json_name], '__origin__', None)
+                    if tp in (list, List):
+                        item_type = getattr(type_mapping[json_name], 
'__args__', None)[0]
                         if is_dataclass(item_type):
                             kwargs[field_name] = [
                                 item_type.from_dict(item)
diff --git a/paimon-python/pypaimon/common/predicate.py 
b/paimon-python/pypaimon/common/predicate.py
index ba56713032..ce4172148c 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -15,7 +15,6 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from __future__ import annotations
 
 from dataclasses import dataclass
 from functools import reduce
@@ -183,7 +182,7 @@ class Predicate:
         else:
             raise ValueError("Unsupported predicate method: 
{}".format(self.method))
 
-    def to_arrow(self) -> pyarrow_compute.Expression | bool:
+    def to_arrow(self) -> Any:
         if self.method == 'equal':
             return pyarrow_dataset.field(self.field) == self.literals[0]
         elif self.method == 'notEqual':
@@ -206,13 +205,40 @@ class Predicate:
             return ~pyarrow_dataset.field(self.field).isin(self.literals)
         elif self.method == 'startsWith':
             pattern = self.literals[0]
-            return 
pyarrow_compute.starts_with(pyarrow_dataset.field(self.field).cast(pyarrow.string()),
 pattern)
+            # For PyArrow compatibility - improved approach
+            try:
+                field_ref = pyarrow_dataset.field(self.field)
+                # Ensure the field is cast to string type
+                string_field = field_ref.cast(pyarrow.string())
+                result = pyarrow_compute.starts_with(string_field, pattern)
+                return result
+            except Exception:
+                # Fallback to True
+                return True
         elif self.method == 'endsWith':
             pattern = self.literals[0]
-            return 
pyarrow_compute.ends_with(pyarrow_dataset.field(self.field).cast(pyarrow.string()),
 pattern)
+            # For PyArrow compatibility
+            try:
+                field_ref = pyarrow_dataset.field(self.field)
+                # Ensure the field is cast to string type
+                string_field = field_ref.cast(pyarrow.string())
+                result = pyarrow_compute.ends_with(string_field, pattern)
+                return result
+            except Exception:
+                # Fallback to True
+                return True
         elif self.method == 'contains':
             pattern = self.literals[0]
-            return 
pyarrow_compute.match_substring(pyarrow_dataset.field(self.field).cast(pyarrow.string()),
 pattern)
+            # For PyArrow compatibility
+            try:
+                field_ref = pyarrow_dataset.field(self.field)
+                # Ensure the field is cast to string type
+                string_field = field_ref.cast(pyarrow.string())
+                result = pyarrow_compute.match_substring(string_field, pattern)
+                return result
+            except Exception:
+                # Fallback to True
+                return True
         elif self.method == 'between':
             return (pyarrow_dataset.field(self.field) >= self.literals[0]) & \
                 (pyarrow_dataset.field(self.field) <= self.literals[1])
diff --git a/paimon-python/pypaimon/read/push_down_utils.py 
b/paimon-python/pypaimon/read/push_down_utils.py
index 31e66973c6..95a99d9005 100644
--- a/paimon-python/pypaimon/read/push_down_utils.py
+++ b/paimon-python/pypaimon/read/push_down_utils.py
@@ -64,7 +64,8 @@ def extract_predicate_to_dict(result: Dict, input_predicate: 
'Predicate', keys:
             return
         # condition: only one key for 'or', and the key belongs to keys
         involved_fields = {p.field for p in input_predicate.literals}
-        if len(involved_fields) == 1 and (field := involved_fields.pop()) in 
keys:
+        field = involved_fields.pop() if len(involved_fields) == 1 else None
+        if field is not None and field in keys:
             result[field].append(input_predicate)
         return
 
diff --git a/paimon-python/pypaimon/read/reader/format_avro_reader.py 
b/paimon-python/pypaimon/read/reader/format_avro_reader.py
index 4ce7c04ed4..a9ca01a6e5 100644
--- a/paimon-python/pypaimon/read/reader/format_avro_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_avro_reader.py
@@ -16,11 +16,10 @@
 # limitations under the License.
 
################################################################################
 
-from typing import List, Optional
+from typing import List, Optional, Any
 
 import fastavro
 import pyarrow as pa
-import pyarrow.compute as pc
 import pyarrow.dataset as ds
 from pyarrow import RecordBatch
 
@@ -36,7 +35,7 @@ class FormatAvroReader(RecordBatchReader):
     """
 
     def __init__(self, file_io: FileIO, file_path: str, read_fields: 
List[str], full_fields: List[DataField],
-                 push_down_predicate: pc.Expression | bool, batch_size: int = 
4096):
+                 push_down_predicate: Any, batch_size: int = 4096):
         self._file = file_io.filesystem.open_input_file(file_path)
         self._avro_reader = fastavro.reader(self._file)
         self._batch_size = batch_size
diff --git a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py 
b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
index ecef589391..d6c403b0f9 100644
--- a/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
+++ b/paimon-python/pypaimon/read/reader/format_pyarrow_reader.py
@@ -16,9 +16,8 @@
 # limitations under the License.
 
################################################################################
 
-from typing import List, Optional
+from typing import List, Optional, Any
 
-import pyarrow.compute as pc
 import pyarrow.dataset as ds
 from pyarrow import RecordBatch
 
@@ -33,7 +32,7 @@ class FormatPyArrowReader(RecordBatchReader):
     """
 
     def __init__(self, file_io: FileIO, file_format: str, file_path: str, 
read_fields: List[str],
-                 push_down_predicate: pc.Expression | bool, batch_size: int = 
4096):
+                 push_down_predicate: Any, batch_size: int = 4096):
         self.dataset = ds.dataset(file_path, format=file_format, 
filesystem=file_io.filesystem)
         self.reader = self.dataset.scanner(
             columns=read_fields,
@@ -49,5 +48,4 @@ class FormatPyArrowReader(RecordBatchReader):
 
     def close(self):
         if self.reader is not None:
-            self.reader.close()
             self.reader = None
diff --git a/paimon-python/pypaimon/read/table_read.py 
b/paimon-python/pypaimon/read/table_read.py
index 621549d832..b5f7a7b765 100644
--- a/paimon-python/pypaimon/read/table_read.py
+++ b/paimon-python/pypaimon/read/table_read.py
@@ -15,11 +15,10 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
-from typing import Iterator, List, Optional
+from typing import Iterator, List, Optional, Any
 
 import pandas
 import pyarrow
-import pyarrow.compute as pc
 
 from pypaimon.common.predicate import Predicate
 from pypaimon.common.predicate_builder import PredicateBuilder
@@ -55,10 +54,10 @@ class TableRead:
 
         return _record_generator()
 
-    def to_arrow_batch_reader(self, splits: List[Split]) -> 
pyarrow.RecordBatchReader:
+    def to_arrow_batch_reader(self, splits: List[Split]) -> 
pyarrow.ipc.RecordBatchReader:
         schema = PyarrowFieldParser.from_paimon_schema(self.read_type)
         batch_iterator = self._arrow_batch_generator(splits, schema)
-        return pyarrow.RecordBatchReader.from_batches(schema, batch_iterator)
+        return pyarrow.ipc.RecordBatchReader.from_batches(schema, 
batch_iterator)
 
     def to_arrow(self, splits: List[Split]) -> Optional[pyarrow.Table]:
         batch_reader = self.to_arrow_batch_reader(splits)
@@ -109,7 +108,7 @@ class TableRead:
 
         return ray.data.from_arrow(self.to_arrow(splits))
 
-    def _push_down_predicate(self) -> pc.Expression | bool:
+    def _push_down_predicate(self) -> Any:
         if self.predicate is None:
             return None
         elif self.table.is_primary_key_table:
diff --git a/paimon-python/pypaimon/tests/py36/__init__.py 
b/paimon-python/pypaimon/tests/py36/__init__.py
new file mode 100644
index 0000000000..53ed4d36c2
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py36/__init__.py
@@ -0,0 +1,17 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
diff --git a/paimon-python/pypaimon/tests/py36/ao_predicate_test.py 
b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
new file mode 100644
index 0000000000..ffa305240b
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py36/ao_predicate_test.py
@@ -0,0 +1,247 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+import os
+import tempfile
+import unittest
+
+import pandas as pd
+import pyarrow as pa
+
+from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.schema.schema import Schema
+from pypaimon.tests.predicates_test import _random_format, 
_check_filtered_result
+
+
+class PredicatePy36Test(unittest.TestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        cls.tempdir = tempfile.mkdtemp()
+        cls.warehouse = os.path.join(cls.tempdir, 'warehouse')
+        cls.catalog = CatalogFactory.create({'warehouse': cls.warehouse})
+        cls.catalog.create_database('default', False)
+        pa_schema = pa.schema([
+            ('f0', pa.int64()),
+            ('f1', pa.string()),
+        ])
+        cls.catalog.create_table('default.test_append', 
Schema.from_pyarrow_schema(
+            pa_schema, options={'file.format': _random_format()}), False)
+
+        df = pd.DataFrame({
+            'f0': [1, 2, 3, 4, 5],
+            'f1': ['abc', 'abbc', 'bc', 'd', None],
+        })
+
+        append_table = cls.catalog.get_table('default.test_append')
+        write_builder = append_table.new_batch_write_builder()
+        write = write_builder.new_write()
+        commit = write_builder.new_commit()
+        write.write_pandas(df)
+        commit.commit(write.prepare_commit())
+        write.close()
+        commit.close()
+
+        cls.df = df
+
+    def testWrongFieldName(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        with self.assertRaises(ValueError) as e:
+            predicate_builder.equal('f2', 'a')
+        self.assertEqual(str(e.exception), "The field f2 is not in field list 
['f0', 'f1'].")
+
+    def testAppendWithDuplicate(self):
+        pa_schema = pa.schema([
+            ('f0', pa.int64()),
+            ('f1', pa.string()),
+        ])
+        self.catalog.create_table('default.test_append_with_duplicate', 
Schema.from_pyarrow_schema(pa_schema), False)
+
+        df = pd.DataFrame({
+            'f0': [1, 1, 2, 2],
+            'f1': ['a', 'b', 'c', 'd'],
+        })
+
+        table = self.catalog.get_table('default.test_append_with_duplicate')
+        write_builder = table.new_batch_write_builder()
+        write = write_builder.new_write()
+        commit = write_builder.new_commit()
+        write.write_pandas(df)
+        commit.commit(write.prepare_commit())
+        write.close()
+        commit.close()
+
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+
+        predicate = predicate_builder.equal('f0', 1)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[0:1])
+
+        predicate = predicate_builder.equal('f0', 0)
+        read_builder = table.new_read_builder().with_filter(predicate)
+        scan = read_builder.new_scan()
+        read = read_builder.new_read()
+        actual_df = read.to_pandas(scan.plan().splits())
+        self.assertEqual(len(actual_df), 0)
+
+    def testAllFieldTypesWithEqual(self):
+        pa_schema = pa.schema([
+            # int
+            ('_tinyint', pa.int8()),
+            ('_smallint', pa.int16()),
+            ('_int', pa.int32()),
+            ('_bigint', pa.int64()),
+            # float
+            ('_float16', pa.float32()),  # NOTE: cannot write pa.float16() 
data into Paimon
+            ('_float32', pa.float32()),
+            ('_double', pa.float64()),
+            # string
+            ('_string', pa.string()),
+            # bool
+            ('_boolean', pa.bool_())
+        ])
+        self.catalog.create_table('default.test_all_field_types',
+                                  Schema.from_pyarrow_schema(pa_schema, 
options={'file.format': _random_format()}),
+                                  False)
+        table = self.catalog.get_table('default.test_all_field_types')
+        write_builder = table.new_batch_write_builder()
+        write = write_builder.new_write()
+        commit = write_builder.new_commit()
+
+        df = pd.DataFrame({
+            '_tinyint': pd.Series([1, 2], dtype='int8'),
+            '_smallint': pd.Series([10, 20], dtype='int16'),
+            '_int': pd.Series([100, 200], dtype='int32'),
+            '_bigint': pd.Series([1000, 2000], dtype='int64'),
+            '_float16': pd.Series([1.0, 2.0], dtype='float32'),
+            '_float32': pd.Series([1.00, 2.00], dtype='float32'),
+            '_double': pd.Series([1.000, 2.000], dtype='double'),
+            '_string': pd.Series(['A', 'B'], dtype='object'),
+            '_boolean': [True, False]
+        })
+        record_batch = pa.RecordBatch.from_pandas(df, schema=pa_schema)
+
+        write.write_arrow_batch(record_batch)
+        commit.commit(write.prepare_commit())
+        write.close()
+        commit.close()
+
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+
+        predicate = predicate_builder.equal('_tinyint', 1)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[0]])
+
+        predicate = predicate_builder.equal('_smallint', 20)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[1]])
+
+        predicate = predicate_builder.equal('_int', 100)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[0]])
+
+        predicate = predicate_builder.equal('_bigint', 2000)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[1]])
+
+        predicate = predicate_builder.equal('_float16', 1.0)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[0]])
+
+        predicate = predicate_builder.equal('_float32', 2.00)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[1]])
+
+        predicate = predicate_builder.equal('_double', 1.000)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[0]])
+
+        predicate = predicate_builder.equal('_string', 'B')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[1]])
+
+        predicate = predicate_builder.equal('_boolean', True)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
df.loc[[0]])
+
+    def testNotEqualAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.not_equal('f0', 1)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[1:4])
+
+    def testLessThanAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.less_than('f0', 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:1])
+
+    def testLessOrEqualAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.less_or_equal('f0', 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
+
+    def testGreaterThanAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.greater_than('f0', 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[3:4])
+
+    def testGreaterOrEqualAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.greater_or_equal('f0', 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[2:4])
+
+    def testIsNullAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.is_null('f1')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[[4]])
+
+    def testIsNotNullAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.is_not_null('f1')
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:3])
+
+    def testIsInAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.is_in('f0', [1, 2])
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:1])
+
+    def testIsNotInAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.is_not_in('f0', [1, 2])
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[2:4])
+
+    def testBetweenAppend(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate = predicate_builder.between('f0', 1, 3)
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[0:2])
+
+    def testAndPredicates(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate1 = predicate_builder.greater_than('f0', 1)
+        predicate2 = predicate_builder.less_than('f0', 4)
+        predicate = predicate_builder.and_predicates([predicate1, predicate2])
+        
_check_filtered_result(table.new_read_builder().with_filter(predicate), 
self.df.loc[1:2])
+
+    def testOrPredicates(self):
+        table = self.catalog.get_table('default.test_append')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        predicate1 = predicate_builder.greater_than('f0', 3)
+        predicate2 = predicate_builder.less_than('f0', 2)
+        predicate = predicate_builder.or_predicates([predicate1, predicate2])
+        _check_filtered_result(table.new_read_builder().with_filter(predicate),
+                               self.df.loc[[0, 3, 4]])
diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
new file mode 100644
index 0000000000..e27b681e90
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
@@ -0,0 +1,115 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+import pyarrow as pa
+
+from pypaimon.tests.py36.pyarrow_compat import table_sort_by
+from pypaimon.schema.schema import Schema
+from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest
+
+
+class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
+
+    def testParquetAppendOnlyReader(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        self.rest_catalog.create_table('default.test_append_only_parquet', 
schema, False)
+        table = self.rest_catalog.get_table('default.test_append_only_parquet')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder()
+        actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+        self.assertEqual(actual, self.expected)
+
+    def testOrcAppendOnlyReader(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'orc'})
+        self.rest_catalog.create_table('default.test_append_only_orc', schema, 
False)
+        table = self.rest_catalog.get_table('default.test_append_only_orc')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder()
+        actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+        self.assertEqual(actual, self.expected)
+
+    def testAvroAppendOnlyReader(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'avro'})
+        self.rest_catalog.create_table('default.test_append_only_avro', 
schema, False)
+        table = self.rest_catalog.get_table('default.test_append_only_avro')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder()
+        actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+        self.assertEqual(actual, self.expected)
+
+    def testAppendOnlyReaderWithFilter(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        self.rest_catalog.create_table('default.test_append_only_filter', 
schema, False)
+        table = self.rest_catalog.get_table('default.test_append_only_filter')
+        self._write_test_table(table)
+
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        p1 = predicate_builder.less_than('user_id', 7)
+        p2 = predicate_builder.greater_or_equal('user_id', 2)
+        p3 = predicate_builder.between('user_id', 0, 6)  # [2/b, 3/c, 4/d, 
5/e, 6/f] left
+        p4 = predicate_builder.is_not_in('behavior', ['b', 'e'])  # [3/c, 4/d, 
6/f] left
+        p5 = predicate_builder.is_in('dt', ['p1'])  # exclude 3/c
+        p6 = predicate_builder.is_not_null('behavior')  # exclude 4/d
+        g1 = predicate_builder.and_predicates([p1, p2, p3, p4, p5, p6])
+        read_builder = table.new_read_builder().with_filter(g1)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            self.expected.slice(5, 1)  # 6/f
+        ])
+        self.assertEqual(table_sort_by(actual, 'user_id'), expected)
+
+        p7 = predicate_builder.startswith('behavior', 'a')
+        p8 = predicate_builder.equal('item_id', 1002)
+        p9 = predicate_builder.is_null('behavior')
+        g2 = predicate_builder.or_predicates([p7, p8, p9])
+        read_builder = table.new_read_builder().with_filter(g2)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            self.expected.slice(0, 1),  # 1/a
+            self.expected.slice(1, 1),  # 2/b
+            self.expected.slice(3, 1),  # 5/e
+        ])
+        self.assertEqual(table_sort_by(actual, 'user_id'), expected)
+
+        # Same as java, 'not_equal' will also filter records of 'None' value
+        p12 = predicate_builder.not_equal('behavior', 'f')
+        read_builder = table.new_read_builder().with_filter(p12)
+        actual = self._read_test_table(read_builder)
+        expected = pa.concat_tables([
+            # not only 6/f, but also 4/d will be filtered
+            self.expected.slice(0, 1),  # 1/a
+            self.expected.slice(1, 1),  # 2/b
+            self.expected.slice(2, 1),  # 3/c
+            self.expected.slice(4, 1),  # 5/e
+            self.expected.slice(6, 1),  # 7/g
+            self.expected.slice(7, 1),  # 8/h
+        ])
+        self.assertEqual(table_sort_by(actual, 'user_id'), expected)
+
+    def testAppendOnlyReaderWithProjection(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        self.rest_catalog.create_table('default.test_append_only_projection', 
schema, False)
+        table = 
self.rest_catalog.get_table('default.test_append_only_projection')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder().with_projection(['dt', 
'user_id'])
+        actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+        expected = self.expected.select(['dt', 'user_id'])
+        self.assertEqual(actual, expected)
diff --git a/paimon-python/pypaimon/tests/py36/pyarrow_compat.py 
b/paimon-python/pypaimon/tests/py36/pyarrow_compat.py
new file mode 100644
index 0000000000..d2cc40867a
--- /dev/null
+++ b/paimon-python/pypaimon/tests/py36/pyarrow_compat.py
@@ -0,0 +1,40 @@
+"""
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+"""
+"""
+PyArrow compatibility module for Python 3.6 and PyArrow 5.0.0
+"""
+
+import pyarrow as pa
+import pyarrow.compute as pc
+
+
+def sort_table_by_column(table: pa.Table, column_name: str, order: str = 
'ascending') -> pa.Table:
+    """
+    Sort a PyArrow Table by a column name.
+    This function provides compatibility for PyArrow 5.0.0 which doesn't have 
Table.sort_by method.
+    """
+    if hasattr(table, 'sort_by'):
+        # PyArrow >= 6.0 has native sort_by method
+        return table.sort_by(column_name)
+    else:
+        # PyArrow 5.0 compatibility using sort_indices and take
+        indices = pc.sort_indices(table, sort_keys=[(column_name, order)])
+        return table.take(indices)
+
+
+table_sort_by = sort_table_by_column
diff --git a/paimon-python/pypaimon/tests/rest_server.py 
b/paimon-python/pypaimon/tests/rest_server.py
index 8804a4f5e2..1419faeb13 100644
--- a/paimon-python/pypaimon/tests/rest_server.py
+++ b/paimon-python/pypaimon/tests/rest_server.py
@@ -717,7 +717,7 @@ class RESTCatalogServer:
         """Add no permission table"""
         self.no_permission_tables.append(identifier.get_full_name())
 
-    def mock_database(self, name: str, options: dict[str, str]) -> 
GetDatabaseResponse:
+    def mock_database(self, name: str, options: Dict[str, str]) -> 
GetDatabaseResponse:
         return GetDatabaseResponse(
             id=str(uuid.uuid4()),
             name=name,
diff --git a/paimon-python/pypaimon/write/writer/data_writer.py 
b/paimon-python/pypaimon/write/writer/data_writer.py
index bc4553847c..4a197c6919 100644
--- a/paimon-python/pypaimon/write/writer/data_writer.py
+++ b/paimon-python/pypaimon/write/writer/data_writer.py
@@ -90,7 +90,7 @@ class DataWriter(ABC):
         if self.pending_data is None:
             return
 
-        current_size = self.pending_data.get_total_buffer_size()
+        current_size = self.pending_data.nbytes
         if current_size > self.target_file_size:
             split_row = self._find_optimal_split_point(self.pending_data, 
self.target_file_size)
             if split_row > 0:
@@ -116,7 +116,9 @@ class DataWriter(ABC):
             raise ValueError(f"Unsupported file format: {self.file_format}")
 
         # min key & max key
-        key_columns_batch = data.select(self.trimmed_primary_key)
+        table = pa.Table.from_batches([data])
+        selected_table = table.select(self.trimmed_primary_key)
+        key_columns_batch = selected_table.to_batches()[0]
         min_key_row_batch = key_columns_batch.slice(0, 1)
         max_key_row_batch = key_columns_batch.slice(key_columns_batch.num_rows 
- 1, 1)
         min_key = [col.to_pylist()[0] for col in min_key_row_batch.columns]
@@ -192,7 +194,7 @@ class DataWriter(ABC):
         while left <= right:
             mid = (left + right) // 2
             slice_data = data.slice(0, mid)
-            slice_size = slice_data.get_total_buffer_size()
+            slice_size = slice_data.nbytes
 
             if slice_size <= target_size:
                 best_split = mid


Reply via email to