This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 17727e7a50 [python] Fix zstd manifest decompression error on Python 
3.6 (#6982)
17727e7a50 is described below

commit 17727e7a50f0c0249201403e4760b163d1869e63
Author: XiaoHongbo <[email protected]>
AuthorDate: Fri Jan 9 13:21:15 2026 +0800

    [python] Fix zstd manifest decompression error on Python 3.6 (#6982)
---
 .../test/java/org/apache/paimon/JavaPyE2ETest.java |  2 +
 .../java/org/apache/paimon/JavaPyLanceE2ETest.java | 13 +---
 paimon-python/dev/lint-python.sh                   |  4 --
 paimon-python/dev/run_mixed_tests.sh               |  7 +-
 paimon-python/pypaimon/__init__.py                 |  7 ++
 paimon-python/pypaimon/manifest/__init__.py        |  9 +++
 .../pypaimon/manifest/fastavro_py36_compat.py      | 77 ++++++++++++++++++++++
 .../pypaimon/tests/e2e/java_py_read_write_test.py  | 35 ++++++----
 8 files changed, 126 insertions(+), 28 deletions(-)

diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java 
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 39c2011bf4..03642b0e55 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -53,6 +53,7 @@ import org.apache.paimon.utils.TraceableFileIO;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 
 import java.nio.file.Files;
@@ -356,6 +357,7 @@ public class JavaPyE2ETest {
 
     @Test
     @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    @DisabledIfSystemProperty(named = "python.version", matches = "3.6")
     public void testReadPkTable() throws Exception {
         Identifier identifier = identifier("mixed_test_pk_tablep_parquet");
         Table table = catalog.getTable(identifier);
diff --git 
a/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java 
b/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java
index 290193340a..a431657b95 100644
--- a/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java
+++ b/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java
@@ -51,6 +51,7 @@ import org.apache.arrow.vector.types.pojo.FieldType;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
 import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
 
 import java.nio.file.Files;
@@ -260,19 +261,9 @@ public class JavaPyLanceE2ETest {
 
     @Test
     @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+    @DisabledIfSystemProperty(named = "python.version", matches = "3.6")
     public void testReadPkTableLance() throws Exception {
         try {
-            // Known issue: Reading Python-written Lance files in Java causes 
JVM crash due to
-            // missing Tokio runtime. The error is:
-            // "there is no reactor running, must be called from the context 
of a Tokio 1.x runtime"
-            //
-            // This is a limitation of lance-core Java bindings. The Rust 
native library requires
-            // Tokio runtime for certain operations when reading files written 
by Python (which may
-            // use different encoding formats). Java-written files can be read 
successfully because
-            // they use synchronous APIs that don't require Tokio.
-            //
-            // Workaround: Try to "warm up" Tokio runtime by reading a 
Java-written file first.
-            // This may initialize the Tokio runtime if it's created on first 
use.
             try {
                 Identifier warmupIdentifier = 
identifier("mixed_test_pk_tablej_lance");
                 try {
diff --git a/paimon-python/dev/lint-python.sh b/paimon-python/dev/lint-python.sh
index 469ee56c9d..d174b120ad 100755
--- a/paimon-python/dev/lint-python.sh
+++ b/paimon-python/dev/lint-python.sh
@@ -203,10 +203,6 @@ function mixed_check() {
     # Get Python version
     PYTHON_VERSION=$(python -c "import sys; 
print(f'{sys.version_info.major}.{sys.version_info.minor}')")
     echo "Detected Python version: $PYTHON_VERSION"
-    if [ "$PYTHON_VERSION" = "3.6" ]; then
-        print_function "STAGE" "mixed tests checks... [SKIPPED]"
-        return
-    fi
     print_function "STAGE" "mixed tests checks"
 
     # Path to the mixed tests script
diff --git a/paimon-python/dev/run_mixed_tests.sh 
b/paimon-python/dev/run_mixed_tests.sh
index 337b694e0c..38a0ada6c2 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -138,11 +138,14 @@ run_java_read_test() {
 
     cd "$PROJECT_ROOT"
 
+    PYTHON_VERSION=$(python -c "import sys; 
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || 
echo "unknown")
+    echo "Detected Python version: $PYTHON_VERSION"
+
     # Run Java test for parquet format in paimon-core
     echo "Running Maven test for JavaPyE2ETest.testReadPkTable (Java Read 
Parquet)..."
     echo "Note: Maven may download dependencies on first run, this may take a 
while..."
     local parquet_result=0
-    if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl 
paimon-core -Drun.e2e.tests=true; then
+    if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl 
paimon-core -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then
         echo -e "${GREEN}✓ Java read parquet test completed successfully${NC}"
     else
         echo -e "${RED}✗ Java read parquet test failed${NC}"
@@ -155,7 +158,7 @@ run_java_read_test() {
     echo "Running Maven test for JavaPyLanceE2ETest.testReadPkTableLance (Java 
Read Lance)..."
     echo "Note: Maven may download dependencies on first run, this may take a 
while..."
     local lance_result=0
-    if mvn test 
-Dtest=org.apache.paimon.JavaPyLanceE2ETest#testReadPkTableLance -pl 
paimon-lance -Drun.e2e.tests=true; then
+    if mvn test 
-Dtest=org.apache.paimon.JavaPyLanceE2ETest#testReadPkTableLance -pl 
paimon-lance -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then
         echo -e "${GREEN}✓ Java read lance test completed successfully${NC}"
     else
         echo -e "${RED}✗ Java read lance test failed${NC}"
diff --git a/paimon-python/pypaimon/__init__.py 
b/paimon-python/pypaimon/__init__.py
index 5313e8e18a..024f8b732b 100644
--- a/paimon-python/pypaimon/__init__.py
+++ b/paimon-python/pypaimon/__init__.py
@@ -15,6 +15,13 @@
 #  specific language governing permissions and limitations
 #  under the License.
 
+import sys
+if sys.version_info[:2] == (3, 6):
+    try:
+        from pypaimon.manifest import fastavro_py36_compat  # noqa: F401
+    except ImportError:
+        pass
+
 from pypaimon.catalog.catalog_factory import CatalogFactory
 from pypaimon.filesystem.pvfs import PaimonVirtualFileSystem
 from pypaimon.schema.schema import Schema
diff --git a/paimon-python/pypaimon/manifest/__init__.py 
b/paimon-python/pypaimon/manifest/__init__.py
index 65b48d4d79..4b50145a36 100644
--- a/paimon-python/pypaimon/manifest/__init__.py
+++ b/paimon-python/pypaimon/manifest/__init__.py
@@ -15,3 +15,12 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 
################################################################################
+
+# Apply fastavro Python 3.6 compatibility patch early, before any other
+# manifest modules are imported that might use fastavro
+import sys
+if sys.version_info[:2] == (3, 6):
+    try:
+        from pypaimon.manifest import fastavro_py36_compat  # noqa: F401
+    except ImportError:
+        pass
diff --git a/paimon-python/pypaimon/manifest/fastavro_py36_compat.py 
b/paimon-python/pypaimon/manifest/fastavro_py36_compat.py
new file mode 100644
index 0000000000..f6f509fc0e
--- /dev/null
+++ b/paimon-python/pypaimon/manifest/fastavro_py36_compat.py
@@ -0,0 +1,77 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+################################################################################
+
+"""
+Provides compatibility patches for fastavro on Python 3.6,
+specifically for handling zstd-compressed Avro files.
+
+The main issue addressed is:
+- On Python 3.6, fastavro's zstd decompression may fail with:
+  "zstd.ZstdError: could not determine content size in frame header"
+  
+This module patches fastavro's zstd handling to use a more compatible
+decompression method that works on Python 3.6.
+"""
+
+import sys
+
+_patch_applied = False
+
+
+def _apply_zstd_patch():
+    global _patch_applied
+    if _patch_applied or sys.version_info[:2] != (3, 6):
+        return
+
+    try:
+        import fastavro._read as fastavro_read
+        import zstandard as zstd
+        from io import BytesIO
+    except (ImportError, AttributeError):
+        return
+
+    def _fixed_zstandard_read_block(decoder):
+        from fastavro._read import read_long
+        
+        length = read_long(decoder)
+        
+        if hasattr(decoder, 'read_fixed'):
+            data = decoder.read_fixed(length)
+        elif hasattr(decoder, 'read'):
+            data = decoder.read(length)
+        else:
+            from fastavro._read import read_fixed
+            data = read_fixed(decoder, length)
+
+        decompressor = zstd.ZstdDecompressor()
+        with decompressor.stream_reader(BytesIO(data)) as reader:
+            decompressed = reader.read()
+            return BytesIO(decompressed)
+
+    if hasattr(fastavro_read, 'BLOCK_READERS'):
+        block_readers = fastavro_read.BLOCK_READERS
+        block_readers['zstandard'] = _fixed_zstandard_read_block
+        block_readers['zstd'] = _fixed_zstandard_read_block
+        _patch_applied = True
+
+
+if sys.version_info[:2] == (3, 6):
+    try:
+        _apply_zstd_patch()
+    except ImportError:
+        pass
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index b88224784c..7a194574f5 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -17,6 +17,7 @@
 
################################################################################
 
 import os
+import sys
 import unittest
 
 import pandas as pd
@@ -25,6 +26,19 @@ from parameterized import parameterized
 from pypaimon.catalog.catalog_factory import CatalogFactory
 from pypaimon.schema.schema import Schema
 
+if sys.version_info[:2] == (3, 6):
+    from pypaimon.tests.py36.pyarrow_compat import table_sort_by
+else:
+    def table_sort_by(table: pa.Table, column_name: str, order: str = 
'ascending') -> pa.Table:
+        return table.sort_by([(column_name, order)])
+
+
+def get_file_format_params():
+    if sys.version_info[:2] == (3, 6):
+        return [('parquet',)]
+    else:
+        return [('parquet',), ('lance',)]
+
 
 class JavaPyReadWriteTest(unittest.TestCase):
     @classmethod
@@ -89,11 +103,13 @@ class JavaPyReadWriteTest(unittest.TestCase):
         res = table_read.to_pandas(table_scan.plan().splits())
         print(res)
 
-    @parameterized.expand([
-        ('parquet',),
-        ('lance',),
-    ])
+    @parameterized.expand(get_file_format_params())
     def test_py_write_read_pk_table(self, file_format):
+        if sys.version_info[:2] == (3, 6):
+            self.skipTest(
+                "Skipping on Python 3.6 due to PyArrow compatibility issue 
(RecordBatch.add_column not available). "
+                "Will be fixed in next PR."
+            )
         pa_schema = pa.schema([
             ('id', pa.int32()),
             ('name', pa.string()),
@@ -150,10 +166,7 @@ class JavaPyReadWriteTest(unittest.TestCase):
         actual_names = set(initial_result['name'].tolist())
         self.assertEqual(actual_names, expected_names)
 
-    @parameterized.expand([
-        ('parquet',),
-        ('lance',),
-    ])
+    @parameterized.expand(get_file_format_params())
     def test_read_pk_table(self, file_format):
         # For parquet, read from Java-written table (no format suffix)
         # For lance, read from Java-written table (with format suffix)
@@ -196,7 +209,7 @@ class JavaPyReadWriteTest(unittest.TestCase):
         read_builder = table.new_read_builder()
         table_read = read_builder.new_read()
         splits = read_builder.new_scan().plan().splits()
-        actual = table_read.to_arrow(splits).sort_by('pt')
+        actual = table_sort_by(table_read.to_arrow(splits), 'pt')
         expected = pa.Table.from_pydict({
             'pt': [1, 2, 2],
             'a': [10, 21, 22],
@@ -219,7 +232,7 @@ class JavaPyReadWriteTest(unittest.TestCase):
         read_builder = table.new_read_builder()
         table_read = read_builder.new_read()
         splits = read_builder.new_scan().plan().splits()
-        actual = table_read.to_arrow(splits).sort_by('pt')
+        actual = table_sort_by(table_read.to_arrow(splits), 'pt')
         expected = pa.Table.from_pydict({
             'pt': [1] * 9999,
             'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],
@@ -242,7 +255,7 @@ class JavaPyReadWriteTest(unittest.TestCase):
         read_builder = table.new_read_builder()
         table_read = read_builder.new_read()
         splits = read_builder.new_scan().plan().splits()
-        actual = table_read.to_arrow(splits).sort_by('pt')
+        actual = table_sort_by(table_read.to_arrow(splits), 'pt')
         expected = pa.Table.from_pydict({
             'pt': [1] * 9999,
             'a': [i * 10 for i in range(1, 10001) if i * 10 != 81930],

Reply via email to