This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 17727e7a50 [python] Fix zstd manifest decompression error on Python
3.6 (#6982)
17727e7a50 is described below
commit 17727e7a50f0c0249201403e4760b163d1869e63
Author: XiaoHongbo <[email protected]>
AuthorDate: Fri Jan 9 13:21:15 2026 +0800
[python] Fix zstd manifest decompression error on Python 3.6 (#6982)
---
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 2 +
.../java/org/apache/paimon/JavaPyLanceE2ETest.java | 13 +---
paimon-python/dev/lint-python.sh | 4 --
paimon-python/dev/run_mixed_tests.sh | 7 +-
paimon-python/pypaimon/__init__.py | 7 ++
paimon-python/pypaimon/manifest/__init__.py | 9 +++
.../pypaimon/manifest/fastavro_py36_compat.py | 77 ++++++++++++++++++++++
.../pypaimon/tests/e2e/java_py_read_write_test.py | 35 ++++++----
8 files changed, 126 insertions(+), 28 deletions(-)
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 39c2011bf4..03642b0e55 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -53,6 +53,7 @@ import org.apache.paimon.utils.TraceableFileIO;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import java.nio.file.Files;
@@ -356,6 +357,7 @@ public class JavaPyE2ETest {
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ @DisabledIfSystemProperty(named = "python.version", matches = "3.6")
public void testReadPkTable() throws Exception {
Identifier identifier = identifier("mixed_test_pk_tablep_parquet");
Table table = catalog.getTable(identifier);
diff --git
a/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java
b/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java
index 290193340a..a431657b95 100644
--- a/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java
+++ b/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java
@@ -51,6 +51,7 @@ import org.apache.arrow.vector.types.pojo.FieldType;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
import java.nio.file.Files;
@@ -260,19 +261,9 @@ public class JavaPyLanceE2ETest {
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ @DisabledIfSystemProperty(named = "python.version", matches = "3.6")
public void testReadPkTableLance() throws Exception {
try {
- // Known issue: Reading Python-written Lance files in Java causes
JVM crash due to
- // missing Tokio runtime. The error is:
- // "there is no reactor running, must be called from the context
of a Tokio 1.x runtime"
- //
- // This is a limitation of lance-core Java bindings. The Rust
native library requires
- // Tokio runtime for certain operations when reading files written
by Python (which may
- // use different encoding formats). Java-written files can be read
successfully because
- // they use synchronous APIs that don't require Tokio.
- //
- // Workaround: Try to "warm up" Tokio runtime by reading a
Java-written file first.
- // This may initialize the Tokio runtime if it's created on first
use.
try {
Identifier warmupIdentifier =
identifier("mixed_test_pk_tablej_lance");
try {
diff --git a/paimon-python/dev/lint-python.sh b/paimon-python/dev/lint-python.sh
index 469ee56c9d..d174b120ad 100755
--- a/paimon-python/dev/lint-python.sh
+++ b/paimon-python/dev/lint-python.sh
@@ -203,10 +203,6 @@ function mixed_check() {
# Get Python version
PYTHON_VERSION=$(python -c "import sys;
print(f'{sys.version_info.major}.{sys.version_info.minor}')")
echo "Detected Python version: $PYTHON_VERSION"
- if [ "$PYTHON_VERSION" = "3.6" ]; then
- print_function "STAGE" "mixed tests checks... [SKIPPED]"
- return
- fi
print_function "STAGE" "mixed tests checks"
# Path to the mixed tests script
diff --git a/paimon-python/dev/run_mixed_tests.sh
b/paimon-python/dev/run_mixed_tests.sh
index 337b694e0c..38a0ada6c2 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -138,11 +138,14 @@ run_java_read_test() {
cd "$PROJECT_ROOT"
+ PYTHON_VERSION=$(python -c "import sys;
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null ||
echo "unknown")
+ echo "Detected Python version: $PYTHON_VERSION"
+
# Run Java test for parquet format in paimon-core
echo "Running Maven test for JavaPyE2ETest.testReadPkTable (Java Read
Parquet)..."
echo "Note: Maven may download dependencies on first run, this may take a
while..."
local parquet_result=0
- if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl
paimon-core -Drun.e2e.tests=true; then
+ if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl
paimon-core -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then
echo -e "${GREEN}✓ Java read parquet test completed successfully${NC}"
else
echo -e "${RED}✗ Java read parquet test failed${NC}"
@@ -155,7 +158,7 @@ run_java_read_test() {
echo "Running Maven test for JavaPyLanceE2ETest.testReadPkTableLance (Java
Read Lance)..."
echo "Note: Maven may download dependencies on first run, this may take a
while..."
local lance_result=0
- if mvn test
-Dtest=org.apache.paimon.JavaPyLanceE2ETest#testReadPkTableLance -pl
paimon-lance -Drun.e2e.tests=true; then
+ if mvn test
-Dtest=org.apache.paimon.JavaPyLanceE2ETest#testReadPkTableLance -pl
paimon-lance -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then
echo -e "${GREEN}✓ Java read lance test completed successfully${NC}"
else
echo -e "${RED}✗ Java read lance test failed${NC}"
diff --git a/paimon-python/pypaimon/__init__.py
b/paimon-python/pypaimon/__init__.py
index 5313e8e18a..024f8b732b 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/__init__.py
@@ -15,6 +15,13 @@
# specific language governing permissions and limitations
# under the License.
+import sys
+if sys.version_info[:2] == (3, 6):
+ try:
+ from pypaimon.manifest import fastavro_py36_compat # noqa: F401
+ except ImportError:
+ pass
+
from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
from pypaimon.schema.schema import Schema
diff --git a/paimon-python/pypaimon/manifest/__init__.py
b/paimon-python/pypaimon/manifest/__init__.py
index 65b48d4d79..4b50145a36 100644
--- a/paimon-python/pypaimon/manifest/__init__.py
+++ b/paimon-python/pypaimon/manifest/__init__.py
@@ -15,3 +15,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
+
+# Apply fastavro Python 3.6 compatibility patch early, before any other
+# manifest modules are imported that might use fastavro
+import sys
+if sys.version_info[:2] == (3, 6):
+ try:
+ from pypaimon.manifest import fastavro_py36_compat # noqa: F401
+ except ImportError:
+ pass
diff --git a/paimon-python/pypaimon/manifest/fastavro_py36_compat.py
b/paimon-python/pypaimon/manifest/fastavro_py36_compat.py
new file mode 100644
index 0000000000..f6f509fc0e
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/fastavro_py36_compat.py
@@ -0,0 +1,77 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+"""
+Provides compatibility patches for fastavro on Python 3.6,
+specifically for handling zstd-compressed Avro files.
+
+The main issue addressed is:
+- On Python 3.6, fastavro's zstd decompression may fail with:
+ "zstd.ZstdError: could not determine content size in frame header"
+
+This module patches fastavro's zstd handling to use a more compatible
+decompression method that works on Python 3.6.
+"""
+
+import sys
+
+_patch_applied = False
+
+
+def _apply_zstd_patch():
+ global _patch_applied
+ if _patch_applied or sys.version_info[:2] != (3, 6):
+ return
+
+ try:
+ import fastavro._read as fastavro_read
+ import zstandard as zstd
+ from io import BytesIO
+ except (ImportError, AttributeError):
+ return
+
+ def _fixed_zstandard_read_block(decoder):
+ from fastavro._read import read_long
+
+ length = read_long(decoder)
+
+ if hasattr(decoder, 'read_fixed'):
+ data = decoder.read_fixed(length)
+ elif hasattr(decoder, 'read'):
+ data = decoder.read(length)
+ else:
+ from fastavro._read import read_fixed
+ data = read_fixed(decoder, length)
+
+ decompressor = zstd.ZstdDecompressor()
+ with decompressor.stream_reader(BytesIO(data)) as reader:
+ decompressed = reader.read()
+ return BytesIO(decompressed)
+
+ if hasattr(fastavro_read, 'BLOCK_READERS'):
+ block_readers = fastavro_read.BLOCK_READERS
+ block_readers['zstandard'] = _fixed_zstandard_read_block
+ block_readers['zstd'] = _fixed_zstandard_read_block
+ _patch_applied = True
+
+
+if sys.version_info[:2] == (3, 6):
+ try:
+ _apply_zstd_patch()
+ except ImportError:
+ pass
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index b88224784c..7a194574f5 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -17,6 +17,7 @@
################################################################################
import os
+import sys
import unittest
import pandas as pd
@@ -25,6 +26,19 @@ from parameterized import parameterized
from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.schema.schema import Schema
+if sys.version_info[:2] == (3, 6):
+ from pypaimon.tests.py36.pyarrow_compat import table_sort_by
+else:
+ def table_sort_by(table: pa.Table, column_name: str, order: str =
'ascending') -> pa.Table:
+ return table.sort_by([(column_name, order)])
+
+
+def get_file_format_params():
+ if sys.version_info[:2] == (3, 6):
+ return [('parquet',)]
+ else:
+ return [('parquet',), ('lance',)]
+
class JavaPyReadWriteTest(unittest.TestCase):
@classmethod
@@ -89,11 +103,13 @@ class JavaPyReadWriteTest(unittest.TestCase):
res = table_read.to_pandas(table_scan.plan().splits())
print(res)
- @parameterized.expand([
- ('parquet',),
- ('lance',),
- ])
+ @parameterized.expand(get_file_format_params())
def test_py_write_read_pk_table(self, file_format):
+ if sys.version_info[:2] == (3, 6):
+ self.skipTest(
+ "Skipping on Python 3.6 due to PyArrow compatibility issue
(RecordBatch.add_column not available). "
+ "Will be fixed in next PR."
+ )
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
@@ -150,10 +166,7 @@ class JavaPyReadWriteTest(unittest.TestCase):
actual_names = set(initial_result['name'].tolist())
self.assertEqual(actual_names, expected_names)
- @parameterized.expand([
- ('parquet',),
- ('lance',),
- ])
+ @parameterized.expand(get_file_format_params())
def test_read_pk_table(self, file_format):
# For parquet, read from Java-written table (no format suffix)
# For lance, read from Java-written table (with format suffix)
@@ -196,7 +209,7 @@ class JavaPyReadWriteTest(unittest.TestCase):
read_builder = table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
- actual = table_read.to_arrow(splits).sort_by('pt')
+ actual = table_sort_by(table_read.to_arrow(splits), 'pt')
expected = pa.Table.from_pydict({
'pt': [1, 2, 2],
'a': [10, 21, 22],
@@ -219,7 +232,7 @@ class JavaPyReadWriteTest(unittest.TestCase):
read_builder = table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
- actual = table_read.to_arrow(splits).sort_by('pt')
+ actual = table_sort_by(table_read.to_arrow(splits), 'pt')
expected = pa.Table.from_pydict({
'pt': [1] * 9999,
'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],
@@ -242,7 +255,7 @@ class JavaPyReadWriteTest(unittest.TestCase):
read_builder = table.new_read_builder()
table_read = read_builder.new_read()
splits = read_builder.new_scan().plan().splits()
- actual = table_read.to_arrow(splits).sort_by('pt')
+ actual = table_sort_by(table_read.to_arrow(splits), 'pt')
expected = pa.Table.from_pydict({
'pt': [1] * 9999,
'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],