This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 88332deba5 [python] Avro schema inconsistents between Java and Python 
(#7077)
88332deba5 is described below

commit 88332deba57d22506f34fa29d35386583c43e9be
Author: ChengHui Chen <[email protected]>
AuthorDate: Thu Jan 22 17:45:58 2026 +0800

    [python] Avro schema inconsistents between Java and Python (#7077)
---
 .../test/java/org/apache/paimon/JavaPyE2ETest.java | 267 +++++++++++----------
 paimon-python/dev/requirements-dev.txt             |   3 +-
 paimon-python/dev/run_mixed_tests.sh               |  34 +--
 paimon-python/pypaimon/schema/data_types.py        |  57 +++--
 .../pypaimon/tests/e2e/java_py_read_write_test.py  |  90 ++++---
 5 files changed, 261 insertions(+), 190 deletions(-)

diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java 
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 39c2011bf4..ef9ba888fc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -106,118 +106,130 @@ public class JavaPyE2ETest {
     @Test
     @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
     public void testJavaWriteReadAppendTable() throws Exception {
-        Identifier identifier = identifier("mixed_test_append_tablej");
-        Schema schema =
-                Schema.newBuilder()
-                        .column("id", DataTypes.INT())
-                        .column("name", DataTypes.STRING())
-                        .column("category", DataTypes.STRING())
-                        .column("value", DataTypes.DOUBLE())
-                        .partitionKeys("category")
-                        .option("dynamic-partition-overwrite", "false")
-                        .build();
-
-        catalog.createTable(identifier, schema, true);
-        Table table = catalog.getTable(identifier);
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-
-        try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
-                InnerTableCommit commit = 
fileStoreTable.newCommit(commitUser)) {
-
-            write.write(createRow4Cols(1, "Apple", "Fruit", 1.5));
-            write.write(createRow4Cols(2, "Banana", "Fruit", 0.8));
-            write.write(createRow4Cols(3, "Carrot", "Vegetable", 0.6));
-            write.write(createRow4Cols(4, "Broccoli", "Vegetable", 1.2));
-            write.write(createRow4Cols(5, "Chicken", "Meat", 5.0));
-            write.write(createRow4Cols(6, "Beef", "Meat", 8.0));
-
-            commit.commit(0, write.prepareCommit(true, 0));
+        for (String format : Arrays.asList("parquet", "orc", "avro")) {
+            Identifier identifier = identifier("mixed_test_append_tablej_" + 
format);
+            Schema schema =
+                    Schema.newBuilder()
+                            .column("id", DataTypes.INT())
+                            .column("name", DataTypes.STRING())
+                            .column("category", DataTypes.STRING())
+                            .column("value", DataTypes.DOUBLE())
+                            .column("ts", DataTypes.TIMESTAMP())
+                            .column("ts_ltz", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+                            .partitionKeys("category")
+                            .option("dynamic-partition-overwrite", "false")
+                            .option("file.format", format)
+                            .build();
+
+            catalog.createTable(identifier, schema, true);
+            Table table = catalog.getTable(identifier);
+            FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+            try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
+                    InnerTableCommit commit = 
fileStoreTable.newCommit(commitUser)) {
+
+                write.write(createRow6Cols(1, "Apple", "Fruit", 1.5, 1000000L, 
2000000L));
+                write.write(createRow6Cols(2, "Banana", "Fruit", 0.8, 
1000001L, 2000001L));
+                write.write(createRow6Cols(3, "Carrot", "Vegetable", 0.6, 
1000002L, 2000002L));
+                write.write(createRow6Cols(4, "Broccoli", "Vegetable", 1.2, 
1000003L, 2000003L));
+                write.write(createRow6Cols(5, "Chicken", "Meat", 5.0, 
1000004L, 2000004L));
+                write.write(createRow6Cols(6, "Beef", "Meat", 8.0, 1000005L, 
2000005L));
+
+                commit.commit(0, write.prepareCommit(true, 0));
+            }
+
+            List<Split> splits =
+                    new 
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+            TableRead read = fileStoreTable.newRead();
+            List<String> res =
+                    getResult(
+                            read,
+                            splits,
+                            row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
+            assertThat(res)
+                    .containsExactlyInAnyOrder(
+                            "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 
1970-01-01T00:33:20",
+                            "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 
1970-01-01T00:33:20.001",
+                            "3, Carrot, Vegetable, 0.6, 
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
+                            "4, Broccoli, Vegetable, 1.2, 
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
+                            "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 
1970-01-01T00:33:20.004",
+                            "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 
1970-01-01T00:33:20.005");
         }
-
-        List<Split> splits =
-                new 
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
-        TableRead read = fileStoreTable.newRead();
-        List<String> res =
-                getResult(
-                        read,
-                        splits,
-                        row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
-        assertThat(res)
-                .containsExactlyInAnyOrder(
-                        "1, Apple, Fruit, 1.5",
-                        "2, Banana, Fruit, 0.8",
-                        "3, Carrot, Vegetable, 0.6",
-                        "4, Broccoli, Vegetable, 1.2",
-                        "5, Chicken, Meat, 5.0",
-                        "6, Beef, Meat, 8.0");
     }
 
     @Test
     @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
     public void testReadAppendTable() throws Exception {
-        Identifier identifier = identifier("mixed_test_append_tablep");
-        Table table = catalog.getTable(identifier);
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-        List<Split> splits =
-                new 
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
-        TableRead read = fileStoreTable.newRead();
-        List<String> res =
-                getResult(
-                        read,
-                        splits,
-                        row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
-        System.out.println(res);
+        for (String format : Arrays.asList("parquet", "orc", "avro")) {
+            Identifier identifier = identifier("mixed_test_append_tablep_" + 
format);
+            Table table = catalog.getTable(identifier);
+            FileStoreTable fileStoreTable = (FileStoreTable) table;
+            List<Split> splits =
+                    new 
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+            TableRead read = fileStoreTable.newRead();
+            List<String> res =
+                    getResult(
+                            read,
+                            splits,
+                            row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
+            System.out.println(res);
+        }
     }
 
     @Test
     @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
     public void testJavaWriteReadPkTable() throws Exception {
-        Identifier identifier = identifier("mixed_test_pk_tablej");
-        Schema schema =
-                Schema.newBuilder()
-                        .column("id", DataTypes.INT())
-                        .column("name", DataTypes.STRING())
-                        .column("category", DataTypes.STRING())
-                        .column("value", DataTypes.DOUBLE())
-                        .primaryKey("id")
-                        .partitionKeys("category")
-                        .option("dynamic-partition-overwrite", "false")
-                        .option("bucket", "2")
-                        .build();
-
-        catalog.createTable(identifier, schema, true);
-        Table table = catalog.getTable(identifier);
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-
-        try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
-                InnerTableCommit commit = 
fileStoreTable.newCommit(commitUser)) {
-
-            write.write(createRow4Cols(1, "Apple", "Fruit", 1.5));
-            write.write(createRow4Cols(2, "Banana", "Fruit", 0.8));
-            write.write(createRow4Cols(3, "Carrot", "Vegetable", 0.6));
-            write.write(createRow4Cols(4, "Broccoli", "Vegetable", 1.2));
-            write.write(createRow4Cols(5, "Chicken", "Meat", 5.0));
-            write.write(createRow4Cols(6, "Beef", "Meat", 8.0));
-
-            commit.commit(0, write.prepareCommit(true, 0));
+        for (String format : Arrays.asList("parquet", "orc", "avro")) {
+            Identifier identifier = identifier("mixed_test_pk_tablej_" + 
format);
+            Schema schema =
+                    Schema.newBuilder()
+                            .column("id", DataTypes.INT())
+                            .column("name", DataTypes.STRING())
+                            .column("category", DataTypes.STRING())
+                            .column("value", DataTypes.DOUBLE())
+                            .column("ts", DataTypes.TIMESTAMP())
+                            .column("ts_ltz", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+                            .primaryKey("id")
+                            .partitionKeys("category")
+                            .option("dynamic-partition-overwrite", "false")
+                            .option("bucket", "2")
+                            .option("file.format", format)
+                            .build();
+
+            catalog.createTable(identifier, schema, true);
+            Table table = catalog.getTable(identifier);
+            FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+            try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
+                    InnerTableCommit commit = 
fileStoreTable.newCommit(commitUser)) {
+
+                write.write(createRow6Cols(1, "Apple", "Fruit", 1.5, 1000000L, 
2000000L));
+                write.write(createRow6Cols(2, "Banana", "Fruit", 0.8, 
1000001L, 2000001L));
+                write.write(createRow6Cols(3, "Carrot", "Vegetable", 0.6, 
1000002L, 2000002L));
+                write.write(createRow6Cols(4, "Broccoli", "Vegetable", 1.2, 
1000003L, 2000003L));
+                write.write(createRow6Cols(5, "Chicken", "Meat", 5.0, 
1000004L, 2000004L));
+                write.write(createRow6Cols(6, "Beef", "Meat", 8.0, 1000005L, 
2000005L));
+
+                commit.commit(0, write.prepareCommit(true, 0));
+            }
+
+            List<Split> splits =
+                    new 
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+            TableRead read = fileStoreTable.newRead();
+            List<String> res =
+                    getResult(
+                            read,
+                            splits,
+                            row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
+            assertThat(res)
+                    .containsExactlyInAnyOrder(
+                            "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 
1970-01-01T00:33:20",
+                            "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 
1970-01-01T00:33:20.001",
+                            "3, Carrot, Vegetable, 0.6, 
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
+                            "4, Broccoli, Vegetable, 1.2, 
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
+                            "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 
1970-01-01T00:33:20.004",
+                            "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 
1970-01-01T00:33:20.005");
         }
-
-        List<Split> splits =
-                new 
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
-        TableRead read = fileStoreTable.newRead();
-        List<String> res =
-                getResult(
-                        read,
-                        splits,
-                        row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
-        assertThat(res)
-                .containsExactlyInAnyOrder(
-                        "1, Apple, Fruit, 1.5",
-                        "2, Banana, Fruit, 0.8",
-                        "3, Carrot, Vegetable, 0.6",
-                        "4, Broccoli, Vegetable, 1.2",
-                        "5, Chicken, Meat, 5.0",
-                        "6, Beef, Meat, 8.0");
     }
 
     @Test
@@ -357,26 +369,31 @@ public class JavaPyE2ETest {
     @Test
     @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
     public void testReadPkTable() throws Exception {
-        Identifier identifier = identifier("mixed_test_pk_tablep_parquet");
-        Table table = catalog.getTable(identifier);
-        FileStoreTable fileStoreTable = (FileStoreTable) table;
-        List<Split> splits =
-                new 
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
-        TableRead read = fileStoreTable.newRead();
-        List<String> res =
-                getResult(
-                        read,
-                        splits,
-                        row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
-        System.out.println("Result: " + res);
-        assertThat(res)
-                .containsExactlyInAnyOrder(
-                        "1, Apple, Fruit, 1.5",
-                        "2, Banana, Fruit, 0.8",
-                        "3, Carrot, Vegetable, 0.6",
-                        "4, Broccoli, Vegetable, 1.2",
-                        "5, Chicken, Meat, 5.0",
-                        "6, Beef, Meat, 8.0");
+        for (String format : Arrays.asList("parquet", "orc", "avro")) {
+            Identifier identifier = identifier("mixed_test_pk_tablep_" + 
format);
+            Table table = catalog.getTable(identifier);
+            FileStoreTable fileStoreTable = (FileStoreTable) table;
+            List<Split> splits =
+                    new 
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+            TableRead read = fileStoreTable.newRead();
+            List<String> res =
+                    getResult(
+                            read,
+                            splits,
+                            row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
+            System.out.println("Result for " + format + " : " + res);
+            
assertThat(table.rowType().getFieldTypes().get(4)).isEqualTo(DataTypes.TIMESTAMP());
+            assertThat(table.rowType().getFieldTypes().get(5))
+                    .isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+            assertThat(res)
+                    .containsExactlyInAnyOrder(
+                            "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 
1970-01-01T00:33:20",
+                            "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 
1970-01-01T00:33:20.001",
+                            "3, Carrot, Vegetable, 0.6, 
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
+                            "4, Broccoli, Vegetable, 1.2, 
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
+                            "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 
1970-01-01T00:33:20.004",
+                            "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 
1970-01-01T00:33:20.005");
+        }
     }
 
     // Helper method from TableTestBase
@@ -407,9 +424,15 @@ public class JavaPyE2ETest {
                 FileIOFinder.find(tablePath), tablePath, tableSchema, 
CatalogEnvironment.empty());
     }
 
-    private static InternalRow createRow4Cols(int id, String name, String 
category, double value) {
+    private static InternalRow createRow6Cols(
+            int id, String name, String category, double value, long ts, long 
tsLtz) {
         return GenericRow.of(
-                id, BinaryString.fromString(name), 
BinaryString.fromString(category), value);
+                id,
+                BinaryString.fromString(name),
+                BinaryString.fromString(category),
+                value,
+                org.apache.paimon.data.Timestamp.fromEpochMillis(ts),
+                org.apache.paimon.data.Timestamp.fromEpochMillis(tsLtz));
     }
 
     protected GenericRow createRow3Cols(Object... values) {
diff --git a/paimon-python/dev/requirements-dev.txt 
b/paimon-python/dev/requirements-dev.txt
index 0f61cccb68..12de5f0404 100644
--- a/paimon-python/dev/requirements-dev.txt
+++ b/paimon-python/dev/requirements-dev.txt
@@ -22,4 +22,5 @@ duckdb==1.3.2
 flake8==4.0.1
 pytest~=7.0
 ray==2.48.0
-requests
\ No newline at end of file
+requests
+parameterized
diff --git a/paimon-python/dev/run_mixed_tests.sh 
b/paimon-python/dev/run_mixed_tests.sh
index a4a404341e..cd39c6e219 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -60,18 +60,18 @@ cleanup_warehouse() {
 
 # Function to run Java test
 run_java_write_test() {
-    echo -e "${YELLOW}=== Step 1: Running Java Write Tests (Parquet + Lance) 
===${NC}"
+    echo -e "${YELLOW}=== Step 1: Running Java Write Tests (Parquet/Orc/Avro + 
Lance) ===${NC}"
 
     cd "$PROJECT_ROOT"
 
-    # Run the Java test method for parquet format
-    echo "Running Maven test for JavaPyE2ETest.testJavaWriteReadPkTable 
(Parquet)..."
+    # Run the Java test method for Parquet/Orc/Avro format
+    echo "Running Maven test for JavaPyE2ETest.testJavaWriteReadPkTable 
(Parquet/Orc/Avro)..."
     echo "Note: Maven may download dependencies on first run, this may take a 
while..."
     local parquet_result=0
     if mvn test 
-Dtest=org.apache.paimon.JavaPyE2ETest#testJavaWriteReadPkTable -pl paimon-core 
-Drun.e2e.tests=true; then
-        echo -e "${GREEN}✓ Java write parquet test completed successfully${NC}"
+        echo -e "${GREEN}✓ Java write Parquet/Orc/Avro test completed 
successfully${NC}"
     else
-        echo -e "${RED}✗ Java write parquet test failed${NC}"
+        echo -e "${RED}✗ Java write Parquet/Orc/Avro test failed${NC}"
         parquet_result=1
     fi
 
@@ -102,7 +102,7 @@ run_python_read_test() {
 
     cd "$PAIMON_PYTHON_DIR"
 
-    # Run the parameterized Python test method (runs for both parquet and 
lance)
+    # Run the parameterized Python test method (runs for both Parquet/Orc/Avro 
and Lance)
     echo "Running Python test for JavaPyReadWriteTest.test_read_pk_table..."
     if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest -k 
"test_read_pk_table" -v; then
         echo -e "${GREEN}✓ Python test completed successfully${NC}"
@@ -121,7 +121,7 @@ run_python_write_test() {
 
     cd "$PAIMON_PYTHON_DIR"
 
-    # Run the parameterized Python test method for writing data (runs for both 
parquet and lance)
+    # Run the parameterized Python test method for writing data (runs for both 
Parquet/Orc/Avro and Lance)
     echo "Running Python test for 
JavaPyReadWriteTest.test_py_write_read_pk_table (Python Write)..."
     if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest -k 
"test_py_write_read_pk_table" -v; then
         echo -e "${GREEN}✓ Python write test completed successfully${NC}"
@@ -134,21 +134,21 @@ run_python_write_test() {
 
 # Function to run Java Read test for Python-Write-Java-Read scenario
 run_java_read_test() {
-    echo -e "${YELLOW}=== Step 4: Running Java Read Test 
(JavaPyE2ETest.testReadPkTable for parquet, 
JavaPyLanceE2ETest.testReadPkTableLance for lance) ===${NC}"
+    echo -e "${YELLOW}=== Step 4: Running Java Read Test 
(JavaPyE2ETest.testReadPkTable for Parquet/Orc/Avro, 
JavaPyLanceE2ETest.testReadPkTableLance for Lance) ===${NC}"
 
     cd "$PROJECT_ROOT"
 
     PYTHON_VERSION=$(python -c "import sys; 
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null || 
echo "unknown")
     echo "Detected Python version: $PYTHON_VERSION"
 
-    # Run Java test for parquet format in paimon-core
-    echo "Running Maven test for JavaPyE2ETest.testReadPkTable (Java Read 
Parquet)..."
+    # Run Java test for Parquet/Orc/Avro format in paimon-core
+    echo "Running Maven test for JavaPyE2ETest.testReadPkTable (Java Read 
Parquet/Orc/Avro)..."
     echo "Note: Maven may download dependencies on first run, this may take a 
while..."
     local parquet_result=0
     if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl 
paimon-core -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then
-        echo -e "${GREEN}✓ Java read parquet test completed successfully${NC}"
+        echo -e "${GREEN}✓ Java read Parquet/Orc/Avro test completed 
successfully${NC}"
     else
-        echo -e "${RED}✗ Java read parquet test failed${NC}"
+        echo -e "${RED}✗ Java read Parquet/Orc/Avro test failed${NC}"
         parquet_result=1
     fi
 
@@ -293,7 +293,7 @@ main() {
         echo ""
     fi
 
-    # Run Java read test (handles both parquet and lance)
+    # Run Java read test (handles both Parquet/Orc/Avro and Lance)
     if ! run_java_read_test; then
         java_read_result=1
     fi
@@ -314,9 +314,9 @@ main() {
     echo -e "${YELLOW}=== Test Results Summary ===${NC}"
 
     if [[ $java_write_result -eq 0 ]]; then
-        echo -e "${GREEN}✓ Java Write Test (Parquet + Lance): PASSED${NC}"
+        echo -e "${GREEN}✓ Java Write Test (Parquet/Orc/Avro + Lance): 
PASSED${NC}"
     else
-        echo -e "${RED}✗ Java Write Test (Parquet + Lance): FAILED${NC}"
+        echo -e "${RED}✗ Java Write Test (Parquet/Orc/Avro + Lance): 
FAILED${NC}"
     fi
 
     if [[ $python_read_result -eq 0 ]]; then
@@ -332,9 +332,9 @@ main() {
     fi
 
     if [[ $java_read_result -eq 0 ]]; then
-        echo -e "${GREEN}✓ Java Read Test (Parquet + Lance): PASSED${NC}"
+        echo -e "${GREEN}✓ Java Read Test (Parquet/Orc/Avro + Lance): 
PASSED${NC}"
     else
-        echo -e "${RED}✗ Java Read Test (Parquet + Lance): FAILED${NC}"
+        echo -e "${RED}✗ Java Read Test (Parquet/Orc/Avro + Lance): 
FAILED${NC}"
     fi
 
     if [[ $pk_dv_result -eq 0 ]]; then
diff --git a/paimon-python/pypaimon/schema/data_types.py 
b/paimon-python/pypaimon/schema/data_types.py
index 91404cb193..51787c7dcb 100755
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -434,20 +434,22 @@ class PyarrowFieldParser:
                     precision = int(match_p.group(1))
                     return pyarrow.decimal128(precision, 0)
             if type_name.startswith('TIMESTAMP'):
-                # WITH_LOCAL_TIME_ZONE is ambiguous and not supported
-                if type_name == 'TIMESTAMP':
-                    return pyarrow.timestamp('us', tz=None)  # default to 6
-                match = re.fullmatch(r'TIMESTAMP\((\d+)\)', type_name)
+                is_ltz = type_name.startswith('TIMESTAMP_LTZ') or 'WITH LOCAL 
TIME ZONE' in type_name
+                tz = 'UTC' if is_ltz else None
+
+                match = re.fullmatch(r'TIMESTAMP(?:_LTZ)?\((\d+)\)(?: WITH 
LOCAL TIME ZONE)?', type_name)
                 if match:
                     precision = int(match.group(1))
                     if precision == 0:
-                        return pyarrow.timestamp('s', tz=None)
+                        return pyarrow.timestamp('s', tz=tz)
                     elif 1 <= precision <= 3:
-                        return pyarrow.timestamp('ms', tz=None)
+                        return pyarrow.timestamp('ms', tz=tz)
                     elif 4 <= precision <= 6:
-                        return pyarrow.timestamp('us', tz=None)
+                        return pyarrow.timestamp('us', tz=tz)
                     elif 7 <= precision <= 9:
-                        return pyarrow.timestamp('ns', tz=None)
+                        return pyarrow.timestamp('ns', tz=tz)
+
+                return pyarrow.timestamp('us', tz=tz)  # default to 6
             elif type_name == 'DATE':
                 return pyarrow.date32()
             if type_name.startswith('TIME'):
@@ -517,9 +519,12 @@ class PyarrowFieldParser:
             type_name = 'BLOB'
         elif types.is_decimal(pa_type):
             type_name = f'DECIMAL({pa_type.precision}, {pa_type.scale})'
-        elif types.is_timestamp(pa_type) and pa_type.tz is None:
+        elif types.is_timestamp(pa_type):
             precision_mapping = {'s': 0, 'ms': 3, 'us': 6, 'ns': 9}
-            type_name = f'TIMESTAMP({precision_mapping[pa_type.unit]})'
+            if pa_type.tz is None:
+                type_name = f'TIMESTAMP({precision_mapping[pa_type.unit]})'
+            else:
+                type_name = f'TIMESTAMP_LTZ({precision_mapping[pa_type.unit]})'
         elif types.is_date32(pa_type):
             type_name = 'DATE'
         elif types.is_time(pa_type):
@@ -561,7 +566,8 @@ class PyarrowFieldParser:
         return fields
 
     @staticmethod
-    def to_avro_type(field_type: pyarrow.DataType, field_name: str) -> 
Union[str, Dict[str, Any]]:
+    def to_avro_type(field_type: pyarrow.DataType, field_name: str,
+                     parent_name: str = "record") -> Union[str, Dict[str, 
Any]]:
         if pyarrow.types.is_integer(field_type):
             if (pyarrow.types.is_signed_integer(field_type) and 
field_type.bit_width <= 32) or \
                (pyarrow.types.is_unsigned_integer(field_type) and 
field_type.bit_width < 32):
@@ -589,31 +595,40 @@ class PyarrowFieldParser:
             return {"type": "int", "logicalType": "date"}
         elif pyarrow.types.is_timestamp(field_type):
             unit = field_type.unit
-            if unit == 'us':
-                return {"type": "long", "logicalType": "timestamp-micros"}
-            elif unit == 'ms':
-                return {"type": "long", "logicalType": "timestamp-millis"}
+            if field_type.tz is None:
+                if unit == 'ms':
+                    return {"type": "long", "logicalType": "timestamp-millis"}
+                elif unit == 'us':
+                    return {"type": "long", "logicalType": "timestamp-micros"}
+                else:
+                    raise ValueError(f"Avro does not support pyarrow timestamp 
with unit {unit}.")
             else:
-                return {"type": "long", "logicalType": "timestamp-micros"}
+                if unit == 'ms':
+                    return {"type": "long", "logicalType": 
"local-timestamp-millis"}
+                elif unit == 'us':
+                    return {"type": "long", "logicalType": 
"local-timestamp-micros"}
+                else:
+                    raise ValueError(f"Avro does not support pyarrow timestamp 
with unit {unit}.")
         elif pyarrow.types.is_list(field_type) or 
pyarrow.types.is_large_list(field_type):
             value_field = field_type.value_field
             return {
                 "type": "array",
-                "items": PyarrowFieldParser.to_avro_type(value_field.type, 
value_field.name)
+                "items": PyarrowFieldParser.to_avro_type(value_field.type, 
value_field.name, parent_name)
             }
         elif pyarrow.types.is_struct(field_type):
-            return PyarrowFieldParser.to_avro_schema(field_type, 
name="{}_record".format(field_name))
+            nested_name = "{}_{}".format(parent_name, field_name)
+            return PyarrowFieldParser.to_avro_schema(field_type, 
name=nested_name)
 
         raise ValueError("Unsupported pyarrow type for Avro conversion: 
{}".format(field_type))
 
     @staticmethod
     def to_avro_schema(pyarrow_schema: Union[pyarrow.Schema, 
pyarrow.StructType],
-                       name: str = "Root",
-                       namespace: str = "pyarrow.avro"
+                       name: str = "record",
+                       namespace: str = "org.apache.paimon.avro.generated"
                        ) -> Dict[str, Any]:
         fields = []
         for field in pyarrow_schema:
-            avro_type = PyarrowFieldParser.to_avro_type(field.type, field.name)
+            avro_type = PyarrowFieldParser.to_avro_type(field.type, 
field.name, parent_name=name)
             if field.nullable:
                 avro_type = ["null", avro_type]
             fields.append({"name": field.name, "type": avro_type})
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 661c5799f7..71e1fbb217 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -35,9 +35,9 @@ else:
 
 def get_file_format_params():
     if sys.version_info[:2] == (3, 6):
-        return [('parquet',)]
+        return [('parquet',), ('orc',), ('avro',)]
     else:
-        return [('parquet',), ('lance',)]
+        return [('parquet',), ('orc',), ('avro',), ('lance',)]
 
 
 class JavaPyReadWriteTest(unittest.TestCase):
@@ -50,28 +50,34 @@ class JavaPyReadWriteTest(unittest.TestCase):
         })
         cls.catalog.create_database('default', True)
 
-    def test_py_write_read_append_table(self):
+    @parameterized.expand(get_file_format_params())
+    def test_py_write_read_append_table(self, file_format):
         pa_schema = pa.schema([
             ('id', pa.int32()),
             ('name', pa.string()),
             ('category', pa.string()),
-            ('value', pa.float64())
+            ('value', pa.float64()),
+            ('ts', pa.timestamp('us')),
+            ('ts_ltz', pa.timestamp('us', tz='UTC'))
         ])
 
         schema = Schema.from_pyarrow_schema(
             pa_schema,
             partition_keys=['category'],
-            options={'dynamic-partition-overwrite': 'false'}
+            options={'dynamic-partition-overwrite': 'false', 'file.format': 
file_format}
         )
 
-        self.catalog.create_table('default.mixed_test_append_tablep', schema, 
False)
-        table = self.catalog.get_table('default.mixed_test_append_tablep')
+        table_name = f'default.mixed_test_append_tablep_{file_format}'
+        self.catalog.create_table(table_name, schema, False)
+        table = self.catalog.get_table(table_name)
 
         initial_data = pd.DataFrame({
             'id': [1, 2, 3, 4, 5, 6],
             'name': ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 
'Beef'],
             'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable', 'Meat', 
'Meat'],
-            'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0]
+            'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0],
+            'ts': pd.to_datetime([1000000, 1000001, 1000002, 1000003, 1000004, 
1000005], unit='ms'),
+            'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003, 
2000004, 2000005], unit='ms', utc=True)
         })
         # Write initial data
         write_builder = table.new_batch_write_builder()
@@ -95,8 +101,9 @@ class JavaPyReadWriteTest(unittest.TestCase):
         actual_names = set(initial_result['name'].tolist())
         self.assertEqual(actual_names, expected_names)
 
-    def test_read_append_table(self):
-        table = self.catalog.get_table('default.mixed_test_append_tablej')
+    @parameterized.expand(get_file_format_params())
+    def test_read_append_table(self, file_format):
+        table = self.catalog.get_table('default.mixed_test_append_tablej_' + 
file_format)
         read_builder = table.new_read_builder()
         table_scan = read_builder.new_scan()
         table_read = read_builder.new_read()
@@ -105,12 +112,23 @@ class JavaPyReadWriteTest(unittest.TestCase):
 
     @parameterized.expand(get_file_format_params())
     def test_py_write_read_pk_table(self, file_format):
-        pa_schema = pa.schema([
-            ('id', pa.int32()),
-            ('name', pa.string()),
-            ('category', pa.string()),
-            ('value', pa.float64())
-        ])
+        # Lance format doesn't support timestamp, so exclude timestamp columns
+        if file_format == 'lance':
+            pa_schema = pa.schema([
+                ('id', pa.int32()),
+                ('name', pa.string()),
+                ('category', pa.string()),
+                ('value', pa.float64())
+            ])
+        else:
+            pa_schema = pa.schema([
+                ('id', pa.int32()),
+                ('name', pa.string()),
+                ('category', pa.string()),
+                ('value', pa.float64()),
+                ('ts', pa.timestamp('us')),
+                ('ts_ltz', pa.timestamp('us', tz='UTC'))
+            ])
 
         table_name = f'default.mixed_test_pk_tablep_{file_format}'
         schema = Schema.from_pyarrow_schema(
@@ -120,7 +138,8 @@ class JavaPyReadWriteTest(unittest.TestCase):
             options={
                 'dynamic-partition-overwrite': 'false',
                 'bucket': '2',
-                'file.format': file_format
+                'file.format': file_format,
+                "orc.timestamp-ltz.legacy.type": "false"
             }
         )
 
@@ -135,12 +154,23 @@ class JavaPyReadWriteTest(unittest.TestCase):
         self.catalog.create_table(table_name, schema, False)
         table = self.catalog.get_table(table_name)
 
-        initial_data = pd.DataFrame({
-            'id': [1, 2, 3, 4, 5, 6],
-            'name': ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 
'Beef'],
-            'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable', 'Meat', 
'Meat'],
-            'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0]
-        })
+        # Lance format doesn't support timestamp, so exclude timestamp columns
+        if file_format == 'lance':
+            initial_data = pd.DataFrame({
+                'id': [1, 2, 3, 4, 5, 6],
+                'name': ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 
'Beef'],
+                'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable', 
'Meat', 'Meat'],
+                'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0]
+            })
+        else:
+            initial_data = pd.DataFrame({
+                'id': [1, 2, 3, 4, 5, 6],
+                'name': ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 
'Beef'],
+                'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable', 
'Meat', 'Meat'],
+                'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0],
+                'ts': pd.to_datetime([1000000, 1000001, 1000002, 1000003, 
1000004, 1000005], unit='ms'),
+                'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003, 
2000004, 2000005], unit='ms', utc=True)
+            })
         write_builder = table.new_batch_write_builder()
         table_write = write_builder.new_write()
         table_commit = write_builder.new_commit()
@@ -163,12 +193,11 @@ class JavaPyReadWriteTest(unittest.TestCase):
 
     @parameterized.expand(get_file_format_params())
     def test_read_pk_table(self, file_format):
-        # For parquet, read from Java-written table (no format suffix)
-        # For lance, read from Java-written table (with format suffix)
-        if file_format == 'parquet':
-            table_name = 'default.mixed_test_pk_tablej'
-        else:
-            table_name = f'default.mixed_test_pk_tablej_{file_format}'
+        # Skip ORC format for Python < 3.8 due to pyarrow limitation with 
TIMESTAMP_INSTANT
+        if sys.version_info[:2] < (3, 8) and file_format == 'orc':
+            self.skipTest("Skipping ORC format for Python < 3.8 (pyarrow does 
not support TIMESTAMP_INSTANT)")
+        
+        table_name = f'default.mixed_test_pk_tablej_{file_format}'
         table = self.catalog.get_table(table_name)
         read_builder = table.new_read_builder()
         table_scan = read_builder.new_scan()
@@ -178,6 +207,9 @@ class JavaPyReadWriteTest(unittest.TestCase):
 
         # Verify data
         self.assertEqual(len(res), 6)
+        if file_format != "lance":
+            self.assertEqual(table.fields[4].type.type, "TIMESTAMP(6)")
+            self.assertEqual(table.fields[5].type.type, "TIMESTAMP(6) WITH 
LOCAL TIME ZONE")
         # Data order may vary due to partitioning/bucketing, so compare as sets
         expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 
'Beef'}
         actual_names = set(res['name'].tolist())


Reply via email to