This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fcc845a27d [python] add support for ROW/STRUCT types in PyPaimon 
(#7129)
fcc845a27d is described below

commit fcc845a27de4324ebd661847e6bbf89550e4ce14
Author: ChengHui Chen <[email protected]>
AuthorDate: Tue Jan 27 17:29:33 2026 +0800

    [python] add support for ROW/STRUCT types in PyPaimon (#7129)
---
 .github/workflows/e2e-tests-flink-1.x.yml          |   1 +
 .github/workflows/e2e-tests-flink-2.x-jdk11.yml    |   1 +
 .github/workflows/utitcase-flink-1.x-common.yml    |   1 +
 .github/workflows/utitcase-flink-1.x-others.yml    |   1 +
 .github/workflows/utitcase-flink-2.x-jdk11.yml     |   1 +
 .github/workflows/utitcase-jdk11.yml               |   1 +
 .github/workflows/utitcase-spark-3.x.yml           |   1 +
 .github/workflows/utitcase-spark-4.x.yml           |   1 +
 .github/workflows/utitcase.yml                     |   1 +
 .../test/java/org/apache/paimon/JavaPyE2ETest.java | 201 ++++++++++++---------
 paimon-python/pypaimon/schema/data_types.py        |  17 ++
 paimon-python/pypaimon/tests/data_types_test.py    |  71 +++++++-
 .../pypaimon/tests/e2e/java_py_read_write_test.py  |  33 +++-
 13 files changed, 245 insertions(+), 86 deletions(-)

diff --git a/.github/workflows/e2e-tests-flink-1.x.yml 
b/.github/workflows/e2e-tests-flink-1.x.yml
index 476b17ec3e..47033b9f6e 100644
--- a/.github/workflows/e2e-tests-flink-1.x.yml
+++ b/.github/workflows/e2e-tests-flink-1.x.yml
@@ -27,6 +27,7 @@ on:
       - 'paimon-python/**'
       - '.github/workflows/paimon-python-checks.yml'
       - 'paimon-lucene/**'
+      - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
 
 env:
   JDK_VERSION: 8
diff --git a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml 
b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
index 941fcc0e86..49ccc7e99a 100644
--- a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
+++ b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
@@ -27,6 +27,7 @@ on:
       - 'paimon-python/**'
       - '.github/workflows/paimon-python-checks.yml'
       - 'paimon-lucene/**'
+      - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
 
 env:
   JDK_VERSION: 11
diff --git a/.github/workflows/utitcase-flink-1.x-common.yml 
b/.github/workflows/utitcase-flink-1.x-common.yml
index 03a6d1e815..b5f1bab891 100644
--- a/.github/workflows/utitcase-flink-1.x-common.yml
+++ b/.github/workflows/utitcase-flink-1.x-common.yml
@@ -26,6 +26,7 @@ on:
       - 'paimon-python/**'
       - '.github/workflows/paimon-python-checks.yml'
       - 'paimon-lucene/**'
+      - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
 
 env:
   JDK_VERSION: 8
diff --git a/.github/workflows/utitcase-flink-1.x-others.yml 
b/.github/workflows/utitcase-flink-1.x-others.yml
index 3c21037c68..0a1d9715c6 100644
--- a/.github/workflows/utitcase-flink-1.x-others.yml
+++ b/.github/workflows/utitcase-flink-1.x-others.yml
@@ -26,6 +26,7 @@ on:
       - 'paimon-python/**'
       - '.github/workflows/paimon-python-checks.yml'
       - 'paimon-lucene/**'
+      - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
 
 env:
   JDK_VERSION: 8
diff --git a/.github/workflows/utitcase-flink-2.x-jdk11.yml 
b/.github/workflows/utitcase-flink-2.x-jdk11.yml
index 4be7d4ae62..129520a0b5 100644
--- a/.github/workflows/utitcase-flink-2.x-jdk11.yml
+++ b/.github/workflows/utitcase-flink-2.x-jdk11.yml
@@ -27,6 +27,7 @@ on:
       - 'paimon-python/**'
       - '.github/workflows/paimon-python-checks.yml'
       - 'paimon-lucene/**'
+      - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
 
 env:
   JDK_VERSION: 11
diff --git a/.github/workflows/utitcase-jdk11.yml 
b/.github/workflows/utitcase-jdk11.yml
index a5db2019a3..931f5e7b3b 100644
--- a/.github/workflows/utitcase-jdk11.yml
+++ b/.github/workflows/utitcase-jdk11.yml
@@ -26,6 +26,7 @@ on:
       - '**/*.md'
       - 'paimon-python/**'
       - '.github/workflows/paimon-python-checks.yml'
+      - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
 
 env:
   JDK_VERSION: 11
diff --git a/.github/workflows/utitcase-spark-3.x.yml 
b/.github/workflows/utitcase-spark-3.x.yml
index 3fff587799..a08c5bad45 100644
--- a/.github/workflows/utitcase-spark-3.x.yml
+++ b/.github/workflows/utitcase-spark-3.x.yml
@@ -27,6 +27,7 @@ on:
       - 'paimon-python/**'
       - '.github/workflows/paimon-python-checks.yml'
       - 'paimon-lucene/**'
+      - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
 
 env:
   JDK_VERSION: 8
diff --git a/.github/workflows/utitcase-spark-4.x.yml 
b/.github/workflows/utitcase-spark-4.x.yml
index fc031342c4..50ef7f8bad 100644
--- a/.github/workflows/utitcase-spark-4.x.yml
+++ b/.github/workflows/utitcase-spark-4.x.yml
@@ -27,6 +27,7 @@ on:
       - 'paimon-python/**'
       - '.github/workflows/paimon-python-checks.yml'
       - 'paimon-lucene/**'
+      - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
 
 env:
   JDK_VERSION: 17
diff --git a/.github/workflows/utitcase.yml b/.github/workflows/utitcase.yml
index 52423a026e..915ec0385a 100644
--- a/.github/workflows/utitcase.yml
+++ b/.github/workflows/utitcase.yml
@@ -29,6 +29,7 @@ on:
       - 'paimon-lucene/**'
       - 'paimon-faiss/**'
       - '.github/workflows/faiss-vector-index-tests.yml'
+      - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
 
 env:
   JDK_VERSION: 8
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java 
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index ef9ba888fc..99e14a010f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -103,60 +103,6 @@ public class JavaPyE2ETest {
         }
     }
 
-    @Test
-    @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
-    public void testJavaWriteReadAppendTable() throws Exception {
-        for (String format : Arrays.asList("parquet", "orc", "avro")) {
-            Identifier identifier = identifier("mixed_test_append_tablej_" + 
format);
-            Schema schema =
-                    Schema.newBuilder()
-                            .column("id", DataTypes.INT())
-                            .column("name", DataTypes.STRING())
-                            .column("category", DataTypes.STRING())
-                            .column("value", DataTypes.DOUBLE())
-                            .column("ts", DataTypes.TIMESTAMP())
-                            .column("ts_ltz", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
-                            .partitionKeys("category")
-                            .option("dynamic-partition-overwrite", "false")
-                            .option("file.format", format)
-                            .build();
-
-            catalog.createTable(identifier, schema, true);
-            Table table = catalog.getTable(identifier);
-            FileStoreTable fileStoreTable = (FileStoreTable) table;
-
-            try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
-                    InnerTableCommit commit = 
fileStoreTable.newCommit(commitUser)) {
-
-                write.write(createRow6Cols(1, "Apple", "Fruit", 1.5, 1000000L, 
2000000L));
-                write.write(createRow6Cols(2, "Banana", "Fruit", 0.8, 
1000001L, 2000001L));
-                write.write(createRow6Cols(3, "Carrot", "Vegetable", 0.6, 
1000002L, 2000002L));
-                write.write(createRow6Cols(4, "Broccoli", "Vegetable", 1.2, 
1000003L, 2000003L));
-                write.write(createRow6Cols(5, "Chicken", "Meat", 5.0, 
1000004L, 2000004L));
-                write.write(createRow6Cols(6, "Beef", "Meat", 8.0, 1000005L, 
2000005L));
-
-                commit.commit(0, write.prepareCommit(true, 0));
-            }
-
-            List<Split> splits =
-                    new 
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
-            TableRead read = fileStoreTable.newRead();
-            List<String> res =
-                    getResult(
-                            read,
-                            splits,
-                            row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
-            assertThat(res)
-                    .containsExactlyInAnyOrder(
-                            "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 
1970-01-01T00:33:20",
-                            "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 
1970-01-01T00:33:20.001",
-                            "3, Carrot, Vegetable, 0.6, 
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
-                            "4, Broccoli, Vegetable, 1.2, 
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
-                            "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 
1970-01-01T00:33:20.004",
-                            "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 
1970-01-01T00:33:20.005");
-        }
-    }
-
     @Test
     @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
     public void testReadAppendTable() throws Exception {
@@ -189,6 +135,21 @@ public class JavaPyE2ETest {
                             .column("value", DataTypes.DOUBLE())
                             .column("ts", DataTypes.TIMESTAMP())
                             .column("ts_ltz", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+                            .column(
+                                    "metadata",
+                                    DataTypes.ROW(
+                                            DataTypes.FIELD(0, "source", 
DataTypes.STRING()),
+                                            DataTypes.FIELD(1, "created_at", 
DataTypes.BIGINT()),
+                                            DataTypes.FIELD(
+                                                    2,
+                                                    "location",
+                                                    DataTypes.ROW(
+                                                            DataTypes.FIELD(
+                                                                    0, "city", 
DataTypes.STRING()),
+                                                            DataTypes.FIELD(
+                                                                    1,
+                                                                    "country",
+                                                                    
DataTypes.STRING())))))
                             .primaryKey("id")
                             .partitionKeys("category")
                             .option("dynamic-partition-overwrite", "false")
@@ -203,12 +164,54 @@ public class JavaPyE2ETest {
             try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
                     InnerTableCommit commit = 
fileStoreTable.newCommit(commitUser)) {
 
-                write.write(createRow6Cols(1, "Apple", "Fruit", 1.5, 1000000L, 
2000000L));
-                write.write(createRow6Cols(2, "Banana", "Fruit", 0.8, 
1000001L, 2000001L));
-                write.write(createRow6Cols(3, "Carrot", "Vegetable", 0.6, 
1000002L, 2000002L));
-                write.write(createRow6Cols(4, "Broccoli", "Vegetable", 1.2, 
1000003L, 2000003L));
-                write.write(createRow6Cols(5, "Chicken", "Meat", 5.0, 
1000004L, 2000004L));
-                write.write(createRow6Cols(6, "Beef", "Meat", 8.0, 1000005L, 
2000005L));
+                write.write(
+                        createRow7Cols(
+                                1, "Apple", "Fruit", 1.5, 1000000L, 2000000L, 
"store1", 1001L,
+                                "Beijing", "China"));
+                write.write(
+                        createRow7Cols(
+                                2,
+                                "Banana",
+                                "Fruit",
+                                0.8,
+                                1000001L,
+                                2000001L,
+                                "store1",
+                                1002L,
+                                "Shanghai",
+                                "China"));
+                write.write(
+                        createRow7Cols(
+                                3,
+                                "Carrot",
+                                "Vegetable",
+                                0.6,
+                                1000002L,
+                                2000002L,
+                                "store2",
+                                1003L,
+                                "Tokyo",
+                                "Japan"));
+                write.write(
+                        createRow7Cols(
+                                4,
+                                "Broccoli",
+                                "Vegetable",
+                                1.2,
+                                1000003L,
+                                2000003L,
+                                "store2",
+                                1004L,
+                                "Seoul",
+                                "Korea"));
+                write.write(
+                        createRow7Cols(
+                                5, "Chicken", "Meat", 5.0, 1000004L, 2000004L, 
"store3", 1005L,
+                                "NewYork", "USA"));
+                write.write(
+                        createRow7Cols(
+                                6, "Beef", "Meat", 8.0, 1000005L, 2000005L, 
"store3", 1006L,
+                                "London", "UK"));
 
                 commit.commit(0, write.prepareCommit(true, 0));
             }
@@ -217,18 +220,15 @@ public class JavaPyE2ETest {
                     new 
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
             TableRead read = fileStoreTable.newRead();
             List<String> res =
-                    getResult(
-                            read,
-                            splits,
-                            row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
+                    getResult(read, splits, row -> rowToStringWithStruct(row, 
table.rowType()));
             assertThat(res)
                     .containsExactlyInAnyOrder(
-                            "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 
1970-01-01T00:33:20",
-                            "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 
1970-01-01T00:33:20.001",
-                            "3, Carrot, Vegetable, 0.6, 
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
-                            "4, Broccoli, Vegetable, 1.2, 
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
-                            "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 
1970-01-01T00:33:20.004",
-                            "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 
1970-01-01T00:33:20.005");
+                            "+I[1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 
1970-01-01T00:33:20, (store1, 1001, (Beijing, China))]",
+                            "+I[2, Banana, Fruit, 0.8, 
1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, (store1, 1002, (Shanghai, 
China))]",
+                            "+I[3, Carrot, Vegetable, 0.6, 
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, (store2, 1003, (Tokyo, 
Japan))]",
+                            "+I[4, Broccoli, Vegetable, 1.2, 
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, (store2, 1004, (Seoul, 
Korea))]",
+                            "+I[5, Chicken, Meat, 5.0, 
1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, (store3, 1005, (NewYork, 
USA))]",
+                            "+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 
1970-01-01T00:33:20.005, (store3, 1006, (London, UK))]");
         }
     }
 
@@ -377,22 +377,22 @@ public class JavaPyE2ETest {
                     new 
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
             TableRead read = fileStoreTable.newRead();
             List<String> res =
-                    getResult(
-                            read,
-                            splits,
-                            row -> DataFormatTestUtil.toStringNoRowKind(row, 
table.rowType()));
+                    getResult(read, splits, row -> rowToStringWithStruct(row, 
table.rowType()));
             System.out.println("Result for " + format + " : " + res);
             
assertThat(table.rowType().getFieldTypes().get(4)).isEqualTo(DataTypes.TIMESTAMP());
             assertThat(table.rowType().getFieldTypes().get(5))
                     .isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+            
assertThat(table.rowType().getFieldTypes().get(6)).isInstanceOf(RowType.class);
+            RowType metadataType = (RowType) 
table.rowType().getFieldTypes().get(6);
+            
assertThat(metadataType.getFieldTypes().get(2)).isInstanceOf(RowType.class);
             assertThat(res)
                     .containsExactlyInAnyOrder(
-                            "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 
1970-01-01T00:33:20",
-                            "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001, 
1970-01-01T00:33:20.001",
-                            "3, Carrot, Vegetable, 0.6, 
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
-                            "4, Broccoli, Vegetable, 1.2, 
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
-                            "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004, 
1970-01-01T00:33:20.004",
-                            "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 
1970-01-01T00:33:20.005");
+                            "+I[1, Apple, Fruit, 1.5, 1970-01-01T00:16:40, 
1970-01-01T00:33:20, (store1, 1001, (Beijing, China))]",
+                            "+I[2, Banana, Fruit, 0.8, 
1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, (store1, 1002, (Shanghai, 
China))]",
+                            "+I[3, Carrot, Vegetable, 0.6, 
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, (store2, 1003, (Tokyo, 
Japan))]",
+                            "+I[4, Broccoli, Vegetable, 1.2, 
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, (store2, 1004, (Seoul, 
Korea))]",
+                            "+I[5, Chicken, Meat, 5.0, 
1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, (store3, 1005, (NewYork, 
USA))]",
+                            "+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005, 
1970-01-01T00:33:20.005, (store3, 1006, (London, UK))]");
         }
     }
 
@@ -424,15 +424,30 @@ public class JavaPyE2ETest {
                 FileIOFinder.find(tablePath), tablePath, tableSchema, 
CatalogEnvironment.empty());
     }
 
-    private static InternalRow createRow6Cols(
-            int id, String name, String category, double value, long ts, long 
tsLtz) {
+    private static InternalRow createRow7Cols(
+            int id,
+            String name,
+            String category,
+            double value,
+            long ts,
+            long tsLtz,
+            String metadataSource,
+            long metadataCreatedAt,
+            String city,
+            String country) {
+        GenericRow locationRow =
+                GenericRow.of(BinaryString.fromString(city), 
BinaryString.fromString(country));
+        GenericRow metadataRow =
+                GenericRow.of(
+                        BinaryString.fromString(metadataSource), 
metadataCreatedAt, locationRow);
         return GenericRow.of(
                 id,
                 BinaryString.fromString(name),
                 BinaryString.fromString(category),
                 value,
                 org.apache.paimon.data.Timestamp.fromEpochMillis(ts),
-                org.apache.paimon.data.Timestamp.fromEpochMillis(tsLtz));
+                org.apache.paimon.data.Timestamp.fromEpochMillis(tsLtz),
+                metadataRow);
     }
 
     protected GenericRow createRow3Cols(Object... values) {
@@ -442,4 +457,24 @@ public class JavaPyE2ETest {
     protected GenericRow createRow3ColsWithKind(RowKind rowKind, Object... 
values) {
         return GenericRow.ofKind(rowKind, values[0], values[1], values[2]);
     }
+
+    private static String rowToStringWithStruct(InternalRow row, RowType type) 
{
+        StringBuilder build = new StringBuilder();
+        build.append(row.getRowKind().shortString()).append("[");
+        for (int i = 0; i < type.getFieldCount(); i++) {
+            if (i != 0) {
+                build.append(", ");
+            }
+            if (row.isNullAt(i)) {
+                build.append("NULL");
+            } else {
+                InternalRow.FieldGetter fieldGetter =
+                        InternalRow.createFieldGetter(type.getTypeAt(i), i);
+                Object field = fieldGetter.getFieldOrNull(row);
+                build.append(DataFormatTestUtil.getDataFieldString(field, 
type.getTypeAt(i)));
+            }
+        }
+        build.append("]");
+        return build.toString();
+    }
 }
diff --git a/paimon-python/pypaimon/schema/data_types.py 
b/paimon-python/pypaimon/schema/data_types.py
index 51787c7dcb..318ddfe02f 100755
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -472,6 +472,12 @@ class PyarrowFieldParser:
             key_type = PyarrowFieldParser.from_paimon_type(data_type.key)
             value_type = PyarrowFieldParser.from_paimon_type(data_type.value)
             return pyarrow.map_(key_type, value_type)
+        elif isinstance(data_type, RowType):
+            pa_fields = []
+            for field in data_type.fields:
+                pa_field_type = PyarrowFieldParser.from_paimon_type(field.type)
+                pa_fields.append(pyarrow.field(field.name, pa_field_type, 
nullable=field.type.nullable))
+            return pyarrow.struct(pa_fields)
         raise ValueError("Unsupported data type: {}".format(data_type))
 
     @staticmethod
@@ -539,6 +545,17 @@ class PyarrowFieldParser:
             key_type = PyarrowFieldParser.to_paimon_type(pa_type.key_type, 
nullable)
             value_type = PyarrowFieldParser.to_paimon_type(pa_type.item_type, 
nullable)
             return MapType(nullable, key_type, value_type)
+        elif types.is_struct(pa_type):
+            pa_type: pyarrow.StructType
+            fields = []
+            for i, pa_field in enumerate(pa_type):
+                field_type = PyarrowFieldParser.to_paimon_type(pa_field.type, 
pa_field.nullable)
+                fields.append(DataField(
+                    id=i,
+                    name=pa_field.name,
+                    type=field_type
+                ))
+            return RowType(nullable, fields)
         if type_name is not None:
             return AtomicType(type_name, nullable)
         raise ValueError("Unsupported pyarrow type: {}".format(pa_type))
diff --git a/paimon-python/pypaimon/tests/data_types_test.py 
b/paimon-python/pypaimon/tests/data_types_test.py
index 53644e24c5..6ace872997 100755
--- a/paimon-python/pypaimon/tests/data_types_test.py
+++ b/paimon-python/pypaimon/tests/data_types_test.py
@@ -17,8 +17,10 @@ limitations under the License.
 """
 import unittest
 from parameterized import parameterized
+import pyarrow as pa
 
-from pypaimon.schema.data_types import DataField, AtomicType, ArrayType, 
MultisetType, MapType, RowType
+from pypaimon.schema.data_types import (DataField, AtomicType, ArrayType, 
MultisetType, MapType,
+                                        RowType, PyarrowFieldParser)
 
 
 class DataTypesTest(unittest.TestCase):
@@ -65,3 +67,70 @@ class DataTypesTest(unittest.TestCase):
                                   DataField(1, "b", 
AtomicType("TIMESTAMP(6)"),)])
         self.assertEqual(str(row_data),
                          str(RowType.from_dict(row_data.to_dict())))
+
+    def test_struct_from_paimon_to_pyarrow(self):
+        paimon_row = RowType(
+            nullable=True,
+            fields=[
+                DataField(0, "field1", AtomicType("INT")),
+                DataField(1, "field2", AtomicType("STRING")),
+                DataField(2, "field3", AtomicType("DOUBLE"))
+            ]
+        )
+        pa_struct = PyarrowFieldParser.from_paimon_type(paimon_row)
+
+        self.assertTrue(pa.types.is_struct(pa_struct))
+        self.assertEqual(len(pa_struct), 3)
+        self.assertEqual(pa_struct[0].name, "field1")
+        self.assertEqual(pa_struct[1].name, "field2")
+        self.assertEqual(pa_struct[2].name, "field3")
+        self.assertTrue(pa.types.is_int32(pa_struct[0].type))
+        self.assertTrue(pa.types.is_string(pa_struct[1].type))
+        self.assertTrue(pa.types.is_float64(pa_struct[2].type))
+
+    def test_struct_from_pyarrow_to_paimon(self):
+        pa_struct = pa.struct([
+            pa.field("name", pa.string()),
+            pa.field("age", pa.int32()),
+            pa.field("score", pa.float64())
+        ])
+        paimon_row = PyarrowFieldParser.to_paimon_type(pa_struct, 
nullable=True)
+        
+        self.assertIsInstance(paimon_row, RowType)
+        self.assertTrue(paimon_row.nullable)
+        self.assertEqual(len(paimon_row.fields), 3)
+        self.assertEqual(paimon_row.fields[0].name, "name")
+        self.assertEqual(paimon_row.fields[1].name, "age")
+        self.assertEqual(paimon_row.fields[2].name, "score")
+        self.assertEqual(paimon_row.fields[0].type.type, "STRING")
+        self.assertEqual(paimon_row.fields[1].type.type, "INT")
+        self.assertEqual(paimon_row.fields[2].type.type, "DOUBLE")
+
+    def test_nested_field_roundtrip(self):
+        nested_field = RowType(
+            nullable=True,
+            fields=[
+                DataField(0, "inner_field1", AtomicType("STRING")),
+                DataField(1, "inner_field2", AtomicType("INT"))
+            ]
+        )
+        paimon_row = RowType(
+            nullable=True,
+            fields=[
+                DataField(0, "outer_field1", AtomicType("BIGINT")),
+                DataField(1, "nested", nested_field)
+            ]
+        )
+        pa_struct = PyarrowFieldParser.from_paimon_type(paimon_row)
+
+        converted_paimon_row = PyarrowFieldParser.to_paimon_type(pa_struct, 
nullable=True)
+        self.assertIsInstance(converted_paimon_row, RowType)
+        self.assertEqual(len(converted_paimon_row.fields), 2)
+        self.assertEqual(converted_paimon_row.fields[0].name, "outer_field1")
+        self.assertEqual(converted_paimon_row.fields[1].name, "nested")
+        
+        converted_nested_field = converted_paimon_row.fields[1].type
+        self.assertIsInstance(converted_nested_field, RowType)
+        self.assertEqual(len(converted_nested_field.fields), 2)
+        self.assertEqual(converted_nested_field.fields[0].name, "inner_field1")
+        self.assertEqual(converted_nested_field.fields[1].name, "inner_field2")
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py 
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 71e1fbb217..b56ced657c 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -127,7 +127,15 @@ class JavaPyReadWriteTest(unittest.TestCase):
                 ('category', pa.string()),
                 ('value', pa.float64()),
                 ('ts', pa.timestamp('us')),
-                ('ts_ltz', pa.timestamp('us', tz='UTC'))
+                ('ts_ltz', pa.timestamp('us', tz='UTC')),
+                ('metadata', pa.struct([
+                    pa.field('source', pa.string()),
+                    pa.field('created_at', pa.int64()),
+                    pa.field('location', pa.struct([
+                        pa.field('city', pa.string()),
+                        pa.field('country', pa.string())
+                    ]))
+                ]))
             ])
 
         table_name = f'default.mixed_test_pk_tablep_{file_format}'
@@ -169,7 +177,15 @@ class JavaPyReadWriteTest(unittest.TestCase):
                 'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable', 
'Meat', 'Meat'],
                 'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0],
                 'ts': pd.to_datetime([1000000, 1000001, 1000002, 1000003, 
1000004, 1000005], unit='ms'),
-                'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003, 
2000004, 2000005], unit='ms', utc=True)
+                'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003, 
2000004, 2000005], unit='ms', utc=True),
+                'metadata': [
+                    {'source': 'store1', 'created_at': 1001, 'location': 
{'city': 'Beijing', 'country': 'China'}},
+                    {'source': 'store1', 'created_at': 1002, 'location': 
{'city': 'Shanghai', 'country': 'China'}},
+                    {'source': 'store2', 'created_at': 1003, 'location': 
{'city': 'Tokyo', 'country': 'Japan'}},
+                    {'source': 'store2', 'created_at': 1004, 'location': 
{'city': 'Seoul', 'country': 'Korea'}},
+                    {'source': 'store3', 'created_at': 1005, 'location': 
{'city': 'NewYork', 'country': 'USA'}},
+                    {'source': 'store3', 'created_at': 1006, 'location': 
{'city': 'London', 'country': 'UK'}}
+                ]
             })
         write_builder = table.new_batch_write_builder()
         table_write = write_builder.new_write()
@@ -210,11 +226,24 @@ class JavaPyReadWriteTest(unittest.TestCase):
         if file_format != "lance":
             self.assertEqual(table.fields[4].type.type, "TIMESTAMP(6)")
             self.assertEqual(table.fields[5].type.type, "TIMESTAMP(6) WITH 
LOCAL TIME ZONE")
+            from pypaimon.schema.data_types import RowType
+            self.assertIsInstance(table.fields[6].type, RowType)
+            metadata_fields = table.fields[6].type.fields
+            self.assertEqual(len(metadata_fields), 3)
+            self.assertEqual(metadata_fields[0].name, 'source')
+            self.assertEqual(metadata_fields[1].name, 'created_at')
+            self.assertEqual(metadata_fields[2].name, 'location')
+            self.assertIsInstance(metadata_fields[2].type, RowType)
+        
         # Data order may vary due to partitioning/bucketing, so compare as sets
         expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 
'Beef'}
         actual_names = set(res['name'].tolist())
         self.assertEqual(actual_names, expected_names)
 
+        # Verify metadata column can be read and contains nested structures
+        if 'metadata' in res.columns:
+            self.assertFalse(res['metadata'].isnull().all())
+
         # For primary key tables, verify that _VALUE_KIND is written correctly
         # by checking if we can read the raw data with system fields
         # Note: Normal read filters out system fields, so we verify through 
Java read

Reply via email to