This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e771c68f1f [python] Fix failing to access DLF under python 3.6 (#6207)
e771c68f1f is described below

commit e771c68f1f2886d4d92436f42dfc2870cf7b86b1
Author: umi <[email protected]>
AuthorDate: Fri Sep 5 19:05:35 2025 +0800

    [python] Fix failing to access DLF under python 3.6 (#6207)
---
 .github/workflows/paimon-python-checks.yml         |  40 ++++-
 .../pypaimon/catalog/rest/rest_catalog.py          |   4 +-
 paimon-python/pypaimon/common/file_io.py           |   5 +-
 paimon-python/pypaimon/common/predicate.py         |   6 +-
 paimon-python/pypaimon/read/table_scan.py          |   4 +-
 paimon-python/pypaimon/table/file_store_table.py   |   6 +-
 .../pypaimon/tests/py36/ao_read_write_test.py      | 172 ++++++++++++++++++++-
 paimon-python/pypaimon/write/row_key_extractor.py  |   2 +-
 paimon-python/setup.py                             |  29 +++-
 9 files changed, 239 insertions(+), 29 deletions(-)

diff --git a/.github/workflows/paimon-python-checks.yml 
b/.github/workflows/paimon-python-checks.yml
index 6f6a8a23a8..3e7a05c710 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -20,6 +20,10 @@ name: Check Code Style and Test
 
 on:
   push:
+    paths:
+      - 'paimon-python/**'
+      - '!**/*.md'
+      - '.github/workflows/paimon-python-checks.yml'
   pull_request:
     paths:
       - 'paimon-python/**'
@@ -27,7 +31,7 @@ on:
       - '.github/workflows/paimon-python-checks.yml'
 
 env:
-  PYTHON_VERSION: "3.10"
+  PYTHON_VERSIONS: "['3.6.15', '3.10']"
 
 concurrency:
   group: ${{ github.workflow }}-${{ github.event_name }}-${{ 
github.event.number || github.run_id }}
@@ -36,18 +40,40 @@ concurrency:
 jobs:
   lint-python:
     runs-on: ubuntu-latest
+    container: "python:${{ matrix.python-version }}-slim"
+    strategy:
+      matrix:
+        python-version: ['3.6.15', '3.10']
 
     steps:
       - name: Checkout code
         uses: actions/checkout@v2
-      - name: Set up Python
-        uses: actions/setup-python@v4
-        with:
-          python-version: ${{ env.PYTHON_VERSION }}
-      - name: Install dependencies
+
+      - name: Install system dependencies
+        shell: bash
+        run: |
+          apt-get update && apt-get install -y \
+            build-essential \
+            git \
+            curl \
+            && rm -rf /var/lib/apt/lists/*
+
+      - name: Verify Python version
+        run: python --version
+
+      - name: Install Python dependencies
+        shell: bash
         run: |
-          python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 
flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests 2>&1 >/dev/null
+          if [[ "${{ matrix.python-version }}" == "3.6.15" ]]; then
+            python -m pip install --upgrade pip==21.3.1
+            python --version
+            python -m pip install -q readerwriterlock==1.0.9 
'fsspec==2021.10.1' 'cachetools==4.2.4' 'ossfs==2021.8.0' pyarrow==6.0.1 
pandas==1.1.5 'polars==0.9.12' 'fastavro==1.4.7' zstandard==0.19.0 
dataclasses==0.8.0 flake8 pytest py4j==0.10.9.9 requests 2>&1 >/dev/null
+          else
+            python -m pip install --upgrade pip
+            python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1 
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0 
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3 
flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests 2>&1 >/dev/null
+          fi
       - name: Run lint-python.sh
+        shell: bash
         run: |
           chmod +x paimon-python/dev/lint-python.sh
           ./paimon-python/dev/lint-python.sh
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py 
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 31cafb2578..f203169fbd 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -205,8 +205,10 @@ class RESTCatalog(Catalog):
         )
         path_parsed = urlparse(schema.options.get(CoreOptions.PATH))
         path = Path(path_parsed.path) if path_parsed.scheme is None else 
Path(schema.options.get(CoreOptions.PATH))
+        table_path = path_parsed.netloc + "/" + path_parsed.path \
+            if path_parsed.scheme == "file" else path_parsed.path[1:]
         table = self.create(data_file_io(path),
-                            Path(path_parsed.netloc + "/" + path_parsed.path),
+                            Path(table_path),
                             schema,
                             catalog_env)
         return table
diff --git a/paimon-python/pypaimon/common/file_io.py 
b/paimon-python/pypaimon/common/file_io.py
index 8830a6a537..c98322543e 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -57,14 +57,13 @@ class FileIO:
 
     def _initialize_oss_fs(self) -> FileSystem:
         from pyarrow.fs import S3FileSystem
-
+        bucket_name = self.properties.get("prefix")
         client_kwargs = {
-            "endpoint_override": self.properties.get(OssOptions.OSS_ENDPOINT),
+            "endpoint_override": bucket_name + "." + 
self.properties.get(OssOptions.OSS_ENDPOINT),
             "access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID),
             "secret_key": 
self.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET),
             "session_token": 
self.properties.get(OssOptions.OSS_SECURITY_TOKEN),
             "region": self.properties.get(OssOptions.OSS_REGION),
-            "force_virtual_addressing": True,
         }
 
         return S3FileSystem(**client_kwargs)
diff --git a/paimon-python/pypaimon/common/predicate.py 
b/paimon-python/pypaimon/common/predicate.py
index ce4172148c..c8a4070c6a 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -214,7 +214,7 @@ class Predicate:
                 return result
             except Exception:
                 # Fallback to True
-                return True
+                return pyarrow_dataset.field(self.field).is_valid() | 
pyarrow_dataset.field(self.field).is_null()
         elif self.method == 'endsWith':
             pattern = self.literals[0]
             # For PyArrow compatibility
@@ -226,7 +226,7 @@ class Predicate:
                 return result
             except Exception:
                 # Fallback to True
-                return True
+                return pyarrow_dataset.field(self.field).is_valid() | 
pyarrow_dataset.field(self.field).is_null()
         elif self.method == 'contains':
             pattern = self.literals[0]
             # For PyArrow compatibility
@@ -238,7 +238,7 @@ class Predicate:
                 return result
             except Exception:
                 # Fallback to True
-                return True
+                return pyarrow_dataset.field(self.field).is_valid() | 
pyarrow_dataset.field(self.field).is_null()
         elif self.method == 'between':
             return (pyarrow_dataset.field(self.field) >= self.literals[0]) & \
                 (pyarrow_dataset.field(self.field) <= self.literals[1])
diff --git a/paimon-python/pypaimon/read/table_scan.py 
b/paimon-python/pypaimon/read/table_scan.py
index 2a927c0055..a4d92bb796 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -67,8 +67,8 @@ class TableScan:
         self.idx_of_this_subtask = None
         self.number_of_para_subtasks = None
 
-        self.only_read_real_buckets = True \
-            if (self.table.options.get('bucket', -1) == 
BucketMode.POSTPONE_BUCKET.value) else False
+        self.only_read_real_buckets = True if int(
+            self.table.options.get('bucket', -1)) == 
BucketMode.POSTPONE_BUCKET.value else False
 
     def plan(self) -> Plan:
         latest_snapshot = self.snapshot_manager.get_latest_snapshot()
diff --git a/paimon-python/pypaimon/table/file_store_table.py 
b/paimon-python/pypaimon/table/file_store_table.py
index 74aaaff4e6..bcde56a5b2 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -70,9 +70,9 @@ class FileStoreTable(Table):
 
     def bucket_mode(self) -> BucketMode:
         if self.is_primary_key_table:
-            if self.options.get(CoreOptions.BUCKET, -1) == -2:
+            if int(self.options.get(CoreOptions.BUCKET, -1)) == -2:
                 return BucketMode.POSTPONE_MODE
-            elif self.options.get(CoreOptions.BUCKET, -1) == -1:
+            elif int(self.options.get(CoreOptions.BUCKET, -1)) == -1:
                 if self.cross_partition_update:
                     return BucketMode.CROSS_PARTITION
                 else:
@@ -80,7 +80,7 @@ class FileStoreTable(Table):
             else:
                 return BucketMode.HASH_FIXED
         else:
-            if self.options.get(CoreOptions.BUCKET, -1) == -1:
+            if int(self.options.get(CoreOptions.BUCKET, -1)) == -1:
                 return BucketMode.BUCKET_UNAWARE
             else:
                 return BucketMode.HASH_FIXED
diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py 
b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
index e27b681e90..575bf535e3 100644
--- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
@@ -15,10 +15,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either 
express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 """
+import logging
+
+import pandas as pd
 import pyarrow as pa
 
-from pypaimon.tests.py36.pyarrow_compat import table_sort_by
+from pypaimon.api.options import Options
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+from pypaimon.common.identifier import Identifier
 from pypaimon.schema.schema import Schema
+from pypaimon.tests.py36.pyarrow_compat import table_sort_by
 from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest
 
 
@@ -113,3 +121,165 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
         actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
         expected = self.expected.select(['dt', 'user_id'])
         self.assertEqual(actual, expected)
+
+    def testAvroAppendOnlyReaderWithProjection(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'], options={'file.format': 'avro'})
+        
self.rest_catalog.create_table('default.test_avro_append_only_projection', 
schema, False)
+        table = 
self.rest_catalog.get_table('default.test_avro_append_only_projection')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder().with_projection(['dt', 
'user_id'])
+        actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+        expected = self.expected.select(['dt', 'user_id'])
+        self.assertEqual(actual, expected)
+
+    def testAppendOnlyReaderWithLimit(self):
+        schema = Schema.from_pyarrow_schema(self.pa_schema, 
partition_keys=['dt'])
+        self.rest_catalog.create_table('default.test_append_only_limit', 
schema, False)
+        table = self.rest_catalog.get_table('default.test_append_only_limit')
+        self._write_test_table(table)
+
+        read_builder = table.new_read_builder().with_limit(1)
+        actual = self._read_test_table(read_builder)
+        # only records from 1st commit (1st split) will be read
+        # might be split of "dt=1" or split of "dt=2"
+        self.assertEqual(actual.num_rows, 4)
+
+    def testWriteWrongSchema(self):
+        self.rest_catalog.create_table('default.test_wrong_schema',
+                                       
Schema.from_pyarrow_schema(self.pa_schema),
+                                       False)
+        table = self.rest_catalog.get_table('default.test_wrong_schema')
+
+        data = {
+            'f0': [1, 2, 3],
+            'f1': ['a', 'b', 'c'],
+        }
+        df = pd.DataFrame(data)
+        schema = pa.schema([
+            ('f0', pa.int64()),
+            ('f1', pa.string())
+        ])
+        record_batch = pa.RecordBatch.from_pandas(df, schema)
+
+        write_builder = table.new_batch_write_builder()
+        table_write = write_builder.new_write()
+
+        with self.assertRaises(ValueError) as e:
+            table_write.write_arrow_batch(record_batch)
+        self.assertTrue(str(e.exception).startswith("Input schema isn't 
consistent with table schema."))
+
+    def testWriteWideTableLargeData(self):
+        logging.basicConfig(level=logging.INFO)
+        catalog = CatalogFactory.create(self.options)
+
+        # Build table structure: 200 data columns + 1 partition column
+        # Create PyArrow schema
+        pa_fields = []
+
+        # Create 200 data columns f0 to f199
+        for i in range(200):
+            pa_fields.append(pa.field(f"f{i}", pa.string(), 
metadata={"description": f"Column f{i}"}))
+
+        # Add partition column dt
+        pa_fields.append(pa.field("dt", pa.string(), metadata={"description": 
"Partition column dt"}))
+
+        # Create PyArrow schema
+        pa_schema = pa.schema(pa_fields)
+
+        # Convert to Paimon Schema and specify partition key
+        schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=["dt"])
+
+        # Create table
+        table_identifier = Identifier.create("default", "wide_table_200cols")
+        try:
+            # If table already exists, drop it first
+            try:
+                catalog.get_table(table_identifier)
+                catalog.drop_table(table_identifier)
+                print(f"Dropped existing table {table_identifier}")
+            except Exception:
+                # Table does not exist, continue creating
+                pass
+
+            # Create new table
+            catalog.create_table(
+                identifier=table_identifier,
+                schema=schema,
+                ignore_if_exists=False
+            )
+
+            print(
+                f"Successfully created table {table_identifier} with 
{len(pa_fields) - 1} "
+                f"data columns and 1 partition column")
+            print(
+                f"Table schema: {len([f for f in pa_fields if f.name != 
'dt'])} data columns (f0-f199) + dt partition")
+
+        except Exception as e:
+            print(f"Error creating table: {e}")
+            raise e
+        import random
+
+        table_identifier = Identifier.create("default", "wide_table_200cols")
+        table = catalog.get_table(table_identifier)
+
+        total_rows = 500000  # rows of data
+        batch_size = 100000  # 100,000 rows per batch
+        commit_batches = total_rows // batch_size
+
+        for commit_batch in range(commit_batches):
+            start_idx = commit_batch * batch_size
+            end_idx = start_idx + batch_size
+
+            print(f"Processing batch {commit_batch + 1}/{commit_batches} 
({start_idx:,} - {end_idx:,})...")
+            # Generate data for current batch - generate data for all 200 
columns
+            data = {}
+            # Generate data for f0-f199
+            for i in range(200):
+                if i == 0:
+                    data[f"f{i}"] = [f'value_{j}' for j in range(start_idx, 
end_idx)]
+                elif i == 1:
+                    data[f"f{i}"] = [random.choice(['A', 'B', 'C', 'D', 'E']) 
for _ in range(batch_size)]
+                elif i == 2:
+                    data[f"f{i}"] = [f'detail_{random.randint(1, 1000)}' for _ 
in range(batch_size)]
+                elif i == 3:
+                    data[f"f{i}"] = [f'id_{j:06d}' for j in range(start_idx, 
end_idx)]
+                else:
+                    # Generate random string data for other columns
+                    data[f"f{i}"] = [f'col{i}_val_{random.randint(1, 10000)}' 
for _ in range(batch_size)]
+
+            # Add partition column data
+            data['dt'] = ['2025-09-01' for _ in range(batch_size)]
+            # Convert dictionary to PyArrow RecordBatch
+            arrow_batch = pa.RecordBatch.from_pydict(data)
+            # Create new write and commit objects for each commit batch
+            write_builder = table.new_batch_write_builder()
+            table_write = write_builder.new_write()
+            table_commit = write_builder.new_commit()
+
+            try:
+                # Write current batch data
+                table_write.write_arrow_batch(arrow_batch)
+                print("Batch data write completed, committing...")
+                # Commit current batch
+                commit_messages = table_write.prepare_commit()
+                table_commit.commit(commit_messages)
+                print(f"Batch {commit_batch + 1} committed successfully! 
Written {end_idx:,} rows of data")
+
+            finally:
+                # Ensure resource cleanup
+                table_write.close()
+                table_commit.close()
+
+        print(
+            f"All data writing completed! "
+            f"Total written {total_rows:,} rows of data to 200-column wide 
table in {commit_batches} commits")
+        rest_catalog = 
RESTCatalog(CatalogContext.create_from_options(Options(self.options)))
+        table = rest_catalog.get_table('default.wide_table_200cols')
+        predicate_builder = table.new_read_builder().new_predicate_builder()
+        read_builder = (table.new_read_builder()
+                        .with_projection(['f0', 'f1'])
+                        .with_filter(predicate=predicate_builder.equal("dt", 
"2025-09-01")))
+        table_read = read_builder.new_read()
+        splits = read_builder.new_scan().plan().splits()
+        self.assertEqual(table_read.to_arrow(splits).num_rows, total_rows)
diff --git a/paimon-python/pypaimon/write/row_key_extractor.py 
b/paimon-python/pypaimon/write/row_key_extractor.py
index aa28aacef2..c1b7ebf7d0 100644
--- a/paimon-python/pypaimon/write/row_key_extractor.py
+++ b/paimon-python/pypaimon/write/row_key_extractor.py
@@ -134,7 +134,7 @@ class PostponeBucketRowKeyExtractor(RowKeyExtractor):
 
     def __init__(self, table_schema: TableSchema):
         super().__init__(table_schema)
-        num_buckets = table_schema.options.get(CoreOptions.BUCKET, -2)
+        num_buckets = int(table_schema.options.get(CoreOptions.BUCKET, -2))
         if num_buckets != BucketMode.POSTPONE_BUCKET.value:
             raise ValueError(f"Postpone bucket mode requires bucket = -2, got 
{num_buckets}")
 
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index a00a2ac487..4761a73969 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -23,13 +23,24 @@ PACKAGES = find_packages(include=["pypaimon*"])
 
 install_requires = [
     'readerwriterlock==1.0.9',
-    'fsspec==2024.3.1',
-    'cachetools==5.3.3',
-    'ossfs==2023.12.0',
-    'pyarrow==16.0.0',
-    'polars==1.32.0',
-    'fastavro==1.11.1',
-    'zstandard==0.24.0'
+    'fsspec==2024.3.1; python_version>"3.6"',
+    'fsspec==2021.10.1; python_version=="3.6"',
+    'cachetools==5.3.3; python_version>"3.6"',
+    'cachetools==4.2.4; python_version=="3.6"',
+    'ossfs==2023.12.0; python_version>"3.6"',
+    'ossfs==2021.8.0; python_version=="3.6"',
+    'pyarrow>=16;   python_version >= "3.8"',
+    'pyarrow==6.0.1; python_version < "3.8"',
+    'pandas==2.3.2; python_version >= "3.7"',
+    'pandas==1.1.5; python_version < "3.7"',
+    'polars==1.32.0; python_version>"3.6"',
+    'polars==0.9.12; python_version=="3.6"',
+    'fastavro==1.11.1; python_version>"3.6"',
+    'fastavro==1.4.7; python_version=="3.6"',
+    'zstandard==0.24.0; python_version>="3.7"',
+    'zstandard==0.19.0; python_version<"3.7"',
+    'dataclasses==0.8.0; python_version < "3.7"',
+    'pip==21.3.1'
 ]
 
 long_description = "See Apache Paimon Python API \
@@ -51,10 +62,12 @@ setup(
     classifiers=[
         "Development Status :: 5 - Production/Stable",
         "License :: OSI Approved :: Apache Software License",
+        "Programming Language :: Python :: 3.6",
+        "Programming Language :: Python :: 3.7",
         "Programming Language :: Python :: 3.8",
         "Programming Language :: Python :: 3.9",
         "Programming Language :: Python :: 3.10",
         "Programming Language :: Python :: 3.11",
     ],
-    python_requires=">=3.8",
+    python_requires=">=3.6",
 )

Reply via email to