This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e771c68f1f [python] Fix failing to access DLF under python 3.6 (#6207)
e771c68f1f is described below
commit e771c68f1f2886d4d92436f42dfc2870cf7b86b1
Author: umi <[email protected]>
AuthorDate: Fri Sep 5 19:05:35 2025 +0800
[python] Fix failing to access DLF under python 3.6 (#6207)
---
.github/workflows/paimon-python-checks.yml | 40 ++++-
.../pypaimon/catalog/rest/rest_catalog.py | 4 +-
paimon-python/pypaimon/common/file_io.py | 5 +-
paimon-python/pypaimon/common/predicate.py | 6 +-
paimon-python/pypaimon/read/table_scan.py | 4 +-
paimon-python/pypaimon/table/file_store_table.py | 6 +-
.../pypaimon/tests/py36/ao_read_write_test.py | 172 ++++++++++++++++++++-
paimon-python/pypaimon/write/row_key_extractor.py | 2 +-
paimon-python/setup.py | 29 +++-
9 files changed, 239 insertions(+), 29 deletions(-)
diff --git a/.github/workflows/paimon-python-checks.yml
b/.github/workflows/paimon-python-checks.yml
index 6f6a8a23a8..3e7a05c710 100644
--- a/.github/workflows/paimon-python-checks.yml
+++ b/.github/workflows/paimon-python-checks.yml
@@ -20,6 +20,10 @@ name: Check Code Style and Test
on:
push:
+ paths:
+ - 'paimon-python/**'
+ - '!**/*.md'
+ - '.github/workflows/paimon-python-checks.yml'
pull_request:
paths:
- 'paimon-python/**'
@@ -27,7 +31,7 @@ on:
- '.github/workflows/paimon-python-checks.yml'
env:
- PYTHON_VERSION: "3.10"
+ PYTHON_VERSIONS: "['3.6.15', '3.10']"
concurrency:
group: ${{ github.workflow }}-${{ github.event_name }}-${{
github.event.number || github.run_id }}
@@ -36,18 +40,40 @@ concurrency:
jobs:
lint-python:
runs-on: ubuntu-latest
+ container: "python:${{ matrix.python-version }}-slim"
+ strategy:
+ matrix:
+ python-version: ['3.6.15', '3.10']
steps:
- name: Checkout code
uses: actions/checkout@v2
- - name: Set up Python
- uses: actions/setup-python@v4
- with:
- python-version: ${{ env.PYTHON_VERSION }}
- - name: Install dependencies
+
+ - name: Install system dependencies
+ shell: bash
+ run: |
+ apt-get update && apt-get install -y \
+ build-essential \
+ git \
+ curl \
+ && rm -rf /var/lib/apt/lists/*
+
+ - name: Verify Python version
+ run: python --version
+
+ - name: Install Python dependencies
+ shell: bash
run: |
- python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3
flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests 2>&1 >/dev/null
+ if [[ "${{ matrix.python-version }}" == "3.6.15" ]]; then
+ python -m pip install --upgrade pip==21.3.1
+ python --version
+ python -m pip install -q readerwriterlock==1.0.9
'fsspec==2021.10.1' 'cachetools==4.2.4' 'ossfs==2021.8.0' pyarrow==6.0.1
pandas==1.1.5 'polars==0.9.12' 'fastavro==1.4.7' zstandard==0.19.0
dataclasses==0.8.0 flake8 pytest py4j==0.10.9.9 requests 2>&1 >/dev/null
+ else
+ python -m pip install --upgrade pip
+ python -m pip install -q readerwriterlock==1.0.9 fsspec==2024.3.1
cachetools==5.3.3 ossfs==2023.12.0 ray==2.48.0 fastavro==1.11.1 pyarrow==16.0.0
zstandard==0.24.0 polars==1.32.0 duckdb==1.3.2 numpy==1.24.3 pandas==2.0.3
flake8==4.0.1 pytest~=7.0 py4j==0.10.9.9 requests 2>&1 >/dev/null
+ fi
- name: Run lint-python.sh
+ shell: bash
run: |
chmod +x paimon-python/dev/lint-python.sh
./paimon-python/dev/lint-python.sh
diff --git a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
index 31cafb2578..f203169fbd 100644
--- a/paimon-python/pypaimon/catalog/rest/rest_catalog.py
+++ b/paimon-python/pypaimon/catalog/rest/rest_catalog.py
@@ -205,8 +205,10 @@ class RESTCatalog(Catalog):
)
path_parsed = urlparse(schema.options.get(CoreOptions.PATH))
path = Path(path_parsed.path) if path_parsed.scheme is None else
Path(schema.options.get(CoreOptions.PATH))
+ table_path = path_parsed.netloc + "/" + path_parsed.path \
+ if path_parsed.scheme == "file" else path_parsed.path[1:]
table = self.create(data_file_io(path),
- Path(path_parsed.netloc + "/" + path_parsed.path),
+ Path(table_path),
schema,
catalog_env)
return table
diff --git a/paimon-python/pypaimon/common/file_io.py
b/paimon-python/pypaimon/common/file_io.py
index 8830a6a537..c98322543e 100644
--- a/paimon-python/pypaimon/common/file_io.py
+++ b/paimon-python/pypaimon/common/file_io.py
@@ -57,14 +57,13 @@ class FileIO:
def _initialize_oss_fs(self) -> FileSystem:
from pyarrow.fs import S3FileSystem
-
+ bucket_name = self.properties.get("prefix")
client_kwargs = {
- "endpoint_override": self.properties.get(OssOptions.OSS_ENDPOINT),
+ "endpoint_override": bucket_name + "." +
self.properties.get(OssOptions.OSS_ENDPOINT),
"access_key": self.properties.get(OssOptions.OSS_ACCESS_KEY_ID),
"secret_key":
self.properties.get(OssOptions.OSS_ACCESS_KEY_SECRET),
"session_token":
self.properties.get(OssOptions.OSS_SECURITY_TOKEN),
"region": self.properties.get(OssOptions.OSS_REGION),
- "force_virtual_addressing": True,
}
return S3FileSystem(**client_kwargs)
diff --git a/paimon-python/pypaimon/common/predicate.py
b/paimon-python/pypaimon/common/predicate.py
index ce4172148c..c8a4070c6a 100644
--- a/paimon-python/pypaimon/common/predicate.py
+++ b/paimon-python/pypaimon/common/predicate.py
@@ -214,7 +214,7 @@ class Predicate:
return result
except Exception:
# Fallback to True
- return True
+ return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
elif self.method == 'endsWith':
pattern = self.literals[0]
# For PyArrow compatibility
@@ -226,7 +226,7 @@ class Predicate:
return result
except Exception:
# Fallback to True
- return True
+ return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
elif self.method == 'contains':
pattern = self.literals[0]
# For PyArrow compatibility
@@ -238,7 +238,7 @@ class Predicate:
return result
except Exception:
# Fallback to True
- return True
+ return pyarrow_dataset.field(self.field).is_valid() |
pyarrow_dataset.field(self.field).is_null()
elif self.method == 'between':
return (pyarrow_dataset.field(self.field) >= self.literals[0]) & \
(pyarrow_dataset.field(self.field) <= self.literals[1])
diff --git a/paimon-python/pypaimon/read/table_scan.py
b/paimon-python/pypaimon/read/table_scan.py
index 2a927c0055..a4d92bb796 100644
--- a/paimon-python/pypaimon/read/table_scan.py
+++ b/paimon-python/pypaimon/read/table_scan.py
@@ -67,8 +67,8 @@ class TableScan:
self.idx_of_this_subtask = None
self.number_of_para_subtasks = None
- self.only_read_real_buckets = True \
- if (self.table.options.get('bucket', -1) ==
BucketMode.POSTPONE_BUCKET.value) else False
+ self.only_read_real_buckets = True if int(
+ self.table.options.get('bucket', -1)) ==
BucketMode.POSTPONE_BUCKET.value else False
def plan(self) -> Plan:
latest_snapshot = self.snapshot_manager.get_latest_snapshot()
diff --git a/paimon-python/pypaimon/table/file_store_table.py
b/paimon-python/pypaimon/table/file_store_table.py
index 74aaaff4e6..bcde56a5b2 100644
--- a/paimon-python/pypaimon/table/file_store_table.py
+++ b/paimon-python/pypaimon/table/file_store_table.py
@@ -70,9 +70,9 @@ class FileStoreTable(Table):
def bucket_mode(self) -> BucketMode:
if self.is_primary_key_table:
- if self.options.get(CoreOptions.BUCKET, -1) == -2:
+ if int(self.options.get(CoreOptions.BUCKET, -1)) == -2:
return BucketMode.POSTPONE_MODE
- elif self.options.get(CoreOptions.BUCKET, -1) == -1:
+ elif int(self.options.get(CoreOptions.BUCKET, -1)) == -1:
if self.cross_partition_update:
return BucketMode.CROSS_PARTITION
else:
@@ -80,7 +80,7 @@ class FileStoreTable(Table):
else:
return BucketMode.HASH_FIXED
else:
- if self.options.get(CoreOptions.BUCKET, -1) == -1:
+ if int(self.options.get(CoreOptions.BUCKET, -1)) == -1:
return BucketMode.BUCKET_UNAWARE
else:
return BucketMode.HASH_FIXED
diff --git a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
index e27b681e90..575bf535e3 100644
--- a/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
+++ b/paimon-python/pypaimon/tests/py36/ao_read_write_test.py
@@ -15,10 +15,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
+import logging
+
+import pandas as pd
import pyarrow as pa
-from pypaimon.tests.py36.pyarrow_compat import table_sort_by
+from pypaimon.api.options import Options
+from pypaimon.catalog.catalog_context import CatalogContext
+from pypaimon.catalog.catalog_factory import CatalogFactory
+from pypaimon.catalog.rest.rest_catalog import RESTCatalog
+from pypaimon.common.identifier import Identifier
from pypaimon.schema.schema import Schema
+from pypaimon.tests.py36.pyarrow_compat import table_sort_by
from pypaimon.tests.rest_catalog_base_test import RESTCatalogBaseTest
@@ -113,3 +121,165 @@ class RESTTableReadWritePy36Test(RESTCatalogBaseTest):
actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
expected = self.expected.select(['dt', 'user_id'])
self.assertEqual(actual, expected)
+
+ def testAvroAppendOnlyReaderWithProjection(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'], options={'file.format': 'avro'})
+
self.rest_catalog.create_table('default.test_avro_append_only_projection',
schema, False)
+ table =
self.rest_catalog.get_table('default.test_avro_append_only_projection')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder().with_projection(['dt',
'user_id'])
+ actual = table_sort_by(self._read_test_table(read_builder), 'user_id')
+ expected = self.expected.select(['dt', 'user_id'])
+ self.assertEqual(actual, expected)
+
+ def testAppendOnlyReaderWithLimit(self):
+ schema = Schema.from_pyarrow_schema(self.pa_schema,
partition_keys=['dt'])
+ self.rest_catalog.create_table('default.test_append_only_limit',
schema, False)
+ table = self.rest_catalog.get_table('default.test_append_only_limit')
+ self._write_test_table(table)
+
+ read_builder = table.new_read_builder().with_limit(1)
+ actual = self._read_test_table(read_builder)
+ # only records from 1st commit (1st split) will be read
+ # might be split of "dt=1" or split of "dt=2"
+ self.assertEqual(actual.num_rows, 4)
+
+ def testWriteWrongSchema(self):
+ self.rest_catalog.create_table('default.test_wrong_schema',
+
Schema.from_pyarrow_schema(self.pa_schema),
+ False)
+ table = self.rest_catalog.get_table('default.test_wrong_schema')
+
+ data = {
+ 'f0': [1, 2, 3],
+ 'f1': ['a', 'b', 'c'],
+ }
+ df = pd.DataFrame(data)
+ schema = pa.schema([
+ ('f0', pa.int64()),
+ ('f1', pa.string())
+ ])
+ record_batch = pa.RecordBatch.from_pandas(df, schema)
+
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+
+ with self.assertRaises(ValueError) as e:
+ table_write.write_arrow_batch(record_batch)
+ self.assertTrue(str(e.exception).startswith("Input schema isn't
consistent with table schema."))
+
+ def testWriteWideTableLargeData(self):
+ logging.basicConfig(level=logging.INFO)
+ catalog = CatalogFactory.create(self.options)
+
+ # Build table structure: 200 data columns + 1 partition column
+ # Create PyArrow schema
+ pa_fields = []
+
+ # Create 200 data columns f0 to f199
+ for i in range(200):
+ pa_fields.append(pa.field(f"f{i}", pa.string(),
metadata={"description": f"Column f{i}"}))
+
+ # Add partition column dt
+ pa_fields.append(pa.field("dt", pa.string(), metadata={"description":
"Partition column dt"}))
+
+ # Create PyArrow schema
+ pa_schema = pa.schema(pa_fields)
+
+ # Convert to Paimon Schema and specify partition key
+ schema = Schema.from_pyarrow_schema(pa_schema, partition_keys=["dt"])
+
+ # Create table
+ table_identifier = Identifier.create("default", "wide_table_200cols")
+ try:
+ # If table already exists, drop it first
+ try:
+ catalog.get_table(table_identifier)
+ catalog.drop_table(table_identifier)
+ print(f"Dropped existing table {table_identifier}")
+ except Exception:
+ # Table does not exist, continue creating
+ pass
+
+ # Create new table
+ catalog.create_table(
+ identifier=table_identifier,
+ schema=schema,
+ ignore_if_exists=False
+ )
+
+ print(
+ f"Successfully created table {table_identifier} with
{len(pa_fields) - 1} "
+ f"data columns and 1 partition column")
+ print(
+ f"Table schema: {len([f for f in pa_fields if f.name !=
'dt'])} data columns (f0-f199) + dt partition")
+
+ except Exception as e:
+ print(f"Error creating table: {e}")
+ raise e
+ import random
+
+ table_identifier = Identifier.create("default", "wide_table_200cols")
+ table = catalog.get_table(table_identifier)
+
+ total_rows = 500000 # rows of data
+ batch_size = 100000 # 100,000 rows per batch
+ commit_batches = total_rows // batch_size
+
+ for commit_batch in range(commit_batches):
+ start_idx = commit_batch * batch_size
+ end_idx = start_idx + batch_size
+
+ print(f"Processing batch {commit_batch + 1}/{commit_batches}
({start_idx:,} - {end_idx:,})...")
+ # Generate data for current batch - generate data for all 200
columns
+ data = {}
+ # Generate data for f0-f199
+ for i in range(200):
+ if i == 0:
+ data[f"f{i}"] = [f'value_{j}' for j in range(start_idx,
end_idx)]
+ elif i == 1:
+ data[f"f{i}"] = [random.choice(['A', 'B', 'C', 'D', 'E'])
for _ in range(batch_size)]
+ elif i == 2:
+ data[f"f{i}"] = [f'detail_{random.randint(1, 1000)}' for _
in range(batch_size)]
+ elif i == 3:
+ data[f"f{i}"] = [f'id_{j:06d}' for j in range(start_idx,
end_idx)]
+ else:
+ # Generate random string data for other columns
+ data[f"f{i}"] = [f'col{i}_val_{random.randint(1, 10000)}'
for _ in range(batch_size)]
+
+ # Add partition column data
+ data['dt'] = ['2025-09-01' for _ in range(batch_size)]
+ # Convert dictionary to PyArrow RecordBatch
+ arrow_batch = pa.RecordBatch.from_pydict(data)
+ # Create new write and commit objects for each commit batch
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ try:
+ # Write current batch data
+ table_write.write_arrow_batch(arrow_batch)
+ print("Batch data write completed, committing...")
+ # Commit current batch
+ commit_messages = table_write.prepare_commit()
+ table_commit.commit(commit_messages)
+ print(f"Batch {commit_batch + 1} committed successfully!
Written {end_idx:,} rows of data")
+
+ finally:
+ # Ensure resource cleanup
+ table_write.close()
+ table_commit.close()
+
+ print(
+ f"All data writing completed! "
+ f"Total written {total_rows:,} rows of data to 200-column wide
table in {commit_batches} commits")
+ rest_catalog =
RESTCatalog(CatalogContext.create_from_options(Options(self.options)))
+ table = rest_catalog.get_table('default.wide_table_200cols')
+ predicate_builder = table.new_read_builder().new_predicate_builder()
+ read_builder = (table.new_read_builder()
+ .with_projection(['f0', 'f1'])
+ .with_filter(predicate=predicate_builder.equal("dt",
"2025-09-01")))
+ table_read = read_builder.new_read()
+ splits = read_builder.new_scan().plan().splits()
+ self.assertEqual(table_read.to_arrow(splits).num_rows, total_rows)
diff --git a/paimon-python/pypaimon/write/row_key_extractor.py
b/paimon-python/pypaimon/write/row_key_extractor.py
index aa28aacef2..c1b7ebf7d0 100644
--- a/paimon-python/pypaimon/write/row_key_extractor.py
+++ b/paimon-python/pypaimon/write/row_key_extractor.py
@@ -134,7 +134,7 @@ class PostponeBucketRowKeyExtractor(RowKeyExtractor):
def __init__(self, table_schema: TableSchema):
super().__init__(table_schema)
- num_buckets = table_schema.options.get(CoreOptions.BUCKET, -2)
+ num_buckets = int(table_schema.options.get(CoreOptions.BUCKET, -2))
if num_buckets != BucketMode.POSTPONE_BUCKET.value:
raise ValueError(f"Postpone bucket mode requires bucket = -2, got
{num_buckets}")
diff --git a/paimon-python/setup.py b/paimon-python/setup.py
index a00a2ac487..4761a73969 100644
--- a/paimon-python/setup.py
+++ b/paimon-python/setup.py
@@ -23,13 +23,24 @@ PACKAGES = find_packages(include=["pypaimon*"])
install_requires = [
'readerwriterlock==1.0.9',
- 'fsspec==2024.3.1',
- 'cachetools==5.3.3',
- 'ossfs==2023.12.0',
- 'pyarrow==16.0.0',
- 'polars==1.32.0',
- 'fastavro==1.11.1',
- 'zstandard==0.24.0'
+ 'fsspec==2024.3.1; python_version>"3.6"',
+ 'fsspec==2021.10.1; python_version=="3.6"',
+ 'cachetools==5.3.3; python_version>"3.6"',
+ 'cachetools==4.2.4; python_version=="3.6"',
+ 'ossfs==2023.12.0; python_version>"3.6"',
+ 'ossfs==2021.8.0; python_version=="3.6"',
+ 'pyarrow>=16; python_version >= "3.8"',
+ 'pyarrow==6.0.1; python_version < "3.8"',
+ 'pandas==2.3.2; python_version >= "3.7"',
+ 'pandas==1.1.5; python_version < "3.7"',
+ 'polars==1.32.0; python_version>"3.6"',
+ 'polars==0.9.12; python_version=="3.6"',
+ 'fastavro==1.11.1; python_version>"3.6"',
+ 'fastavro==1.4.7; python_version=="3.6"',
+ 'zstandard==0.24.0; python_version>="3.7"',
+ 'zstandard==0.19.0; python_version<"3.7"',
+ 'dataclasses==0.8.0; python_version < "3.7"',
+ 'pip==21.3.1'
]
long_description = "See Apache Paimon Python API \
@@ -51,10 +62,12 @@ setup(
classifiers=[
"Development Status :: 5 - Production/Stable",
"License :: OSI Approved :: Apache Software License",
+ "Programming Language :: Python :: 3.6",
+ "Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
- python_requires=">=3.8",
+ python_requires=">=3.6",
)