This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new fcc845a27d [python] add support for ROW/STRUCT types in PyPaimon
(#7129)
fcc845a27d is described below
commit fcc845a27de4324ebd661847e6bbf89550e4ce14
Author: ChengHui Chen <[email protected]>
AuthorDate: Tue Jan 27 17:29:33 2026 +0800
[python] add support for ROW/STRUCT types in PyPaimon (#7129)
---
.github/workflows/e2e-tests-flink-1.x.yml | 1 +
.github/workflows/e2e-tests-flink-2.x-jdk11.yml | 1 +
.github/workflows/utitcase-flink-1.x-common.yml | 1 +
.github/workflows/utitcase-flink-1.x-others.yml | 1 +
.github/workflows/utitcase-flink-2.x-jdk11.yml | 1 +
.github/workflows/utitcase-jdk11.yml | 1 +
.github/workflows/utitcase-spark-3.x.yml | 1 +
.github/workflows/utitcase-spark-4.x.yml | 1 +
.github/workflows/utitcase.yml | 1 +
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 201 ++++++++++++---------
paimon-python/pypaimon/schema/data_types.py | 17 ++
paimon-python/pypaimon/tests/data_types_test.py | 71 +++++++-
.../pypaimon/tests/e2e/java_py_read_write_test.py | 33 +++-
13 files changed, 245 insertions(+), 86 deletions(-)
diff --git a/.github/workflows/e2e-tests-flink-1.x.yml
b/.github/workflows/e2e-tests-flink-1.x.yml
index 476b17ec3e..47033b9f6e 100644
--- a/.github/workflows/e2e-tests-flink-1.x.yml
+++ b/.github/workflows/e2e-tests-flink-1.x.yml
@@ -27,6 +27,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
+ - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
env:
JDK_VERSION: 8
diff --git a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
index 941fcc0e86..49ccc7e99a 100644
--- a/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
+++ b/.github/workflows/e2e-tests-flink-2.x-jdk11.yml
@@ -27,6 +27,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
+ - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
env:
JDK_VERSION: 11
diff --git a/.github/workflows/utitcase-flink-1.x-common.yml
b/.github/workflows/utitcase-flink-1.x-common.yml
index 03a6d1e815..b5f1bab891 100644
--- a/.github/workflows/utitcase-flink-1.x-common.yml
+++ b/.github/workflows/utitcase-flink-1.x-common.yml
@@ -26,6 +26,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
+ - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
env:
JDK_VERSION: 8
diff --git a/.github/workflows/utitcase-flink-1.x-others.yml
b/.github/workflows/utitcase-flink-1.x-others.yml
index 3c21037c68..0a1d9715c6 100644
--- a/.github/workflows/utitcase-flink-1.x-others.yml
+++ b/.github/workflows/utitcase-flink-1.x-others.yml
@@ -26,6 +26,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
+ - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
env:
JDK_VERSION: 8
diff --git a/.github/workflows/utitcase-flink-2.x-jdk11.yml
b/.github/workflows/utitcase-flink-2.x-jdk11.yml
index 4be7d4ae62..129520a0b5 100644
--- a/.github/workflows/utitcase-flink-2.x-jdk11.yml
+++ b/.github/workflows/utitcase-flink-2.x-jdk11.yml
@@ -27,6 +27,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
+ - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
env:
JDK_VERSION: 11
diff --git a/.github/workflows/utitcase-jdk11.yml
b/.github/workflows/utitcase-jdk11.yml
index a5db2019a3..931f5e7b3b 100644
--- a/.github/workflows/utitcase-jdk11.yml
+++ b/.github/workflows/utitcase-jdk11.yml
@@ -26,6 +26,7 @@ on:
- '**/*.md'
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
+ - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
env:
JDK_VERSION: 11
diff --git a/.github/workflows/utitcase-spark-3.x.yml
b/.github/workflows/utitcase-spark-3.x.yml
index 3fff587799..a08c5bad45 100644
--- a/.github/workflows/utitcase-spark-3.x.yml
+++ b/.github/workflows/utitcase-spark-3.x.yml
@@ -27,6 +27,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
+ - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
env:
JDK_VERSION: 8
diff --git a/.github/workflows/utitcase-spark-4.x.yml
b/.github/workflows/utitcase-spark-4.x.yml
index fc031342c4..50ef7f8bad 100644
--- a/.github/workflows/utitcase-spark-4.x.yml
+++ b/.github/workflows/utitcase-spark-4.x.yml
@@ -27,6 +27,7 @@ on:
- 'paimon-python/**'
- '.github/workflows/paimon-python-checks.yml'
- 'paimon-lucene/**'
+ - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
env:
JDK_VERSION: 17
diff --git a/.github/workflows/utitcase.yml b/.github/workflows/utitcase.yml
index 52423a026e..915ec0385a 100644
--- a/.github/workflows/utitcase.yml
+++ b/.github/workflows/utitcase.yml
@@ -29,6 +29,7 @@ on:
- 'paimon-lucene/**'
- 'paimon-faiss/**'
- '.github/workflows/faiss-vector-index-tests.yml'
+ - 'paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java'
env:
JDK_VERSION: 8
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index ef9ba888fc..99e14a010f 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -103,60 +103,6 @@ public class JavaPyE2ETest {
}
}
- @Test
- @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
- public void testJavaWriteReadAppendTable() throws Exception {
- for (String format : Arrays.asList("parquet", "orc", "avro")) {
- Identifier identifier = identifier("mixed_test_append_tablej_" +
format);
- Schema schema =
- Schema.newBuilder()
- .column("id", DataTypes.INT())
- .column("name", DataTypes.STRING())
- .column("category", DataTypes.STRING())
- .column("value", DataTypes.DOUBLE())
- .column("ts", DataTypes.TIMESTAMP())
- .column("ts_ltz",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
- .partitionKeys("category")
- .option("dynamic-partition-overwrite", "false")
- .option("file.format", format)
- .build();
-
- catalog.createTable(identifier, schema, true);
- Table table = catalog.getTable(identifier);
- FileStoreTable fileStoreTable = (FileStoreTable) table;
-
- try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
- InnerTableCommit commit =
fileStoreTable.newCommit(commitUser)) {
-
- write.write(createRow6Cols(1, "Apple", "Fruit", 1.5, 1000000L,
2000000L));
- write.write(createRow6Cols(2, "Banana", "Fruit", 0.8,
1000001L, 2000001L));
- write.write(createRow6Cols(3, "Carrot", "Vegetable", 0.6,
1000002L, 2000002L));
- write.write(createRow6Cols(4, "Broccoli", "Vegetable", 1.2,
1000003L, 2000003L));
- write.write(createRow6Cols(5, "Chicken", "Meat", 5.0,
1000004L, 2000004L));
- write.write(createRow6Cols(6, "Beef", "Meat", 8.0, 1000005L,
2000005L));
-
- commit.commit(0, write.prepareCommit(true, 0));
- }
-
- List<Split> splits =
- new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
- TableRead read = fileStoreTable.newRead();
- List<String> res =
- getResult(
- read,
- splits,
- row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
- assertThat(res)
- .containsExactlyInAnyOrder(
- "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40,
1970-01-01T00:33:20",
- "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001,
1970-01-01T00:33:20.001",
- "3, Carrot, Vegetable, 0.6,
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
- "4, Broccoli, Vegetable, 1.2,
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
- "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004,
1970-01-01T00:33:20.004",
- "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005,
1970-01-01T00:33:20.005");
- }
- }
-
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testReadAppendTable() throws Exception {
@@ -189,6 +135,21 @@ public class JavaPyE2ETest {
.column("value", DataTypes.DOUBLE())
.column("ts", DataTypes.TIMESTAMP())
.column("ts_ltz",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .column(
+ "metadata",
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "source",
DataTypes.STRING()),
+ DataTypes.FIELD(1, "created_at",
DataTypes.BIGINT()),
+ DataTypes.FIELD(
+ 2,
+ "location",
+ DataTypes.ROW(
+ DataTypes.FIELD(
+ 0, "city",
DataTypes.STRING()),
+ DataTypes.FIELD(
+ 1,
+ "country",
+
DataTypes.STRING())))))
.primaryKey("id")
.partitionKeys("category")
.option("dynamic-partition-overwrite", "false")
@@ -203,12 +164,54 @@ public class JavaPyE2ETest {
try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
InnerTableCommit commit =
fileStoreTable.newCommit(commitUser)) {
- write.write(createRow6Cols(1, "Apple", "Fruit", 1.5, 1000000L,
2000000L));
- write.write(createRow6Cols(2, "Banana", "Fruit", 0.8,
1000001L, 2000001L));
- write.write(createRow6Cols(3, "Carrot", "Vegetable", 0.6,
1000002L, 2000002L));
- write.write(createRow6Cols(4, "Broccoli", "Vegetable", 1.2,
1000003L, 2000003L));
- write.write(createRow6Cols(5, "Chicken", "Meat", 5.0,
1000004L, 2000004L));
- write.write(createRow6Cols(6, "Beef", "Meat", 8.0, 1000005L,
2000005L));
+ write.write(
+ createRow7Cols(
+ 1, "Apple", "Fruit", 1.5, 1000000L, 2000000L,
"store1", 1001L,
+ "Beijing", "China"));
+ write.write(
+ createRow7Cols(
+ 2,
+ "Banana",
+ "Fruit",
+ 0.8,
+ 1000001L,
+ 2000001L,
+ "store1",
+ 1002L,
+ "Shanghai",
+ "China"));
+ write.write(
+ createRow7Cols(
+ 3,
+ "Carrot",
+ "Vegetable",
+ 0.6,
+ 1000002L,
+ 2000002L,
+ "store2",
+ 1003L,
+ "Tokyo",
+ "Japan"));
+ write.write(
+ createRow7Cols(
+ 4,
+ "Broccoli",
+ "Vegetable",
+ 1.2,
+ 1000003L,
+ 2000003L,
+ "store2",
+ 1004L,
+ "Seoul",
+ "Korea"));
+ write.write(
+ createRow7Cols(
+ 5, "Chicken", "Meat", 5.0, 1000004L, 2000004L,
"store3", 1005L,
+ "NewYork", "USA"));
+ write.write(
+ createRow7Cols(
+ 6, "Beef", "Meat", 8.0, 1000005L, 2000005L,
"store3", 1006L,
+ "London", "UK"));
commit.commit(0, write.prepareCommit(true, 0));
}
@@ -217,18 +220,15 @@ public class JavaPyE2ETest {
new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
TableRead read = fileStoreTable.newRead();
List<String> res =
- getResult(
- read,
- splits,
- row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
+ getResult(read, splits, row -> rowToStringWithStruct(row,
table.rowType()));
assertThat(res)
.containsExactlyInAnyOrder(
- "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40,
1970-01-01T00:33:20",
- "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001,
1970-01-01T00:33:20.001",
- "3, Carrot, Vegetable, 0.6,
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
- "4, Broccoli, Vegetable, 1.2,
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
- "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004,
1970-01-01T00:33:20.004",
- "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005,
1970-01-01T00:33:20.005");
+ "+I[1, Apple, Fruit, 1.5, 1970-01-01T00:16:40,
1970-01-01T00:33:20, (store1, 1001, (Beijing, China))]",
+ "+I[2, Banana, Fruit, 0.8,
1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, (store1, 1002, (Shanghai,
China))]",
+ "+I[3, Carrot, Vegetable, 0.6,
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, (store2, 1003, (Tokyo,
Japan))]",
+ "+I[4, Broccoli, Vegetable, 1.2,
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, (store2, 1004, (Seoul,
Korea))]",
+ "+I[5, Chicken, Meat, 5.0,
1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, (store3, 1005, (NewYork,
USA))]",
+ "+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005,
1970-01-01T00:33:20.005, (store3, 1006, (London, UK))]");
}
}
@@ -377,22 +377,22 @@ public class JavaPyE2ETest {
new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
TableRead read = fileStoreTable.newRead();
List<String> res =
- getResult(
- read,
- splits,
- row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
+ getResult(read, splits, row -> rowToStringWithStruct(row,
table.rowType()));
System.out.println("Result for " + format + " : " + res);
assertThat(table.rowType().getFieldTypes().get(4)).isEqualTo(DataTypes.TIMESTAMP());
assertThat(table.rowType().getFieldTypes().get(5))
.isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+
assertThat(table.rowType().getFieldTypes().get(6)).isInstanceOf(RowType.class);
+ RowType metadataType = (RowType)
table.rowType().getFieldTypes().get(6);
+
assertThat(metadataType.getFieldTypes().get(2)).isInstanceOf(RowType.class);
assertThat(res)
.containsExactlyInAnyOrder(
- "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40,
1970-01-01T00:33:20",
- "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001,
1970-01-01T00:33:20.001",
- "3, Carrot, Vegetable, 0.6,
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
- "4, Broccoli, Vegetable, 1.2,
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
- "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004,
1970-01-01T00:33:20.004",
- "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005,
1970-01-01T00:33:20.005");
+ "+I[1, Apple, Fruit, 1.5, 1970-01-01T00:16:40,
1970-01-01T00:33:20, (store1, 1001, (Beijing, China))]",
+ "+I[2, Banana, Fruit, 0.8,
1970-01-01T00:16:40.001, 1970-01-01T00:33:20.001, (store1, 1002, (Shanghai,
China))]",
+ "+I[3, Carrot, Vegetable, 0.6,
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002, (store2, 1003, (Tokyo,
Japan))]",
+ "+I[4, Broccoli, Vegetable, 1.2,
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003, (store2, 1004, (Seoul,
Korea))]",
+ "+I[5, Chicken, Meat, 5.0,
1970-01-01T00:16:40.004, 1970-01-01T00:33:20.004, (store3, 1005, (NewYork,
USA))]",
+ "+I[6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005,
1970-01-01T00:33:20.005, (store3, 1006, (London, UK))]");
}
}
@@ -424,15 +424,30 @@ public class JavaPyE2ETest {
FileIOFinder.find(tablePath), tablePath, tableSchema,
CatalogEnvironment.empty());
}
- private static InternalRow createRow6Cols(
- int id, String name, String category, double value, long ts, long
tsLtz) {
+ private static InternalRow createRow7Cols(
+ int id,
+ String name,
+ String category,
+ double value,
+ long ts,
+ long tsLtz,
+ String metadataSource,
+ long metadataCreatedAt,
+ String city,
+ String country) {
+ GenericRow locationRow =
+ GenericRow.of(BinaryString.fromString(city),
BinaryString.fromString(country));
+ GenericRow metadataRow =
+ GenericRow.of(
+ BinaryString.fromString(metadataSource),
metadataCreatedAt, locationRow);
return GenericRow.of(
id,
BinaryString.fromString(name),
BinaryString.fromString(category),
value,
org.apache.paimon.data.Timestamp.fromEpochMillis(ts),
- org.apache.paimon.data.Timestamp.fromEpochMillis(tsLtz));
+ org.apache.paimon.data.Timestamp.fromEpochMillis(tsLtz),
+ metadataRow);
}
protected GenericRow createRow3Cols(Object... values) {
@@ -442,4 +457,24 @@ public class JavaPyE2ETest {
protected GenericRow createRow3ColsWithKind(RowKind rowKind, Object...
values) {
return GenericRow.ofKind(rowKind, values[0], values[1], values[2]);
}
+
+ private static String rowToStringWithStruct(InternalRow row, RowType type)
{
+ StringBuilder build = new StringBuilder();
+ build.append(row.getRowKind().shortString()).append("[");
+ for (int i = 0; i < type.getFieldCount(); i++) {
+ if (i != 0) {
+ build.append(", ");
+ }
+ if (row.isNullAt(i)) {
+ build.append("NULL");
+ } else {
+ InternalRow.FieldGetter fieldGetter =
+ InternalRow.createFieldGetter(type.getTypeAt(i), i);
+ Object field = fieldGetter.getFieldOrNull(row);
+ build.append(DataFormatTestUtil.getDataFieldString(field,
type.getTypeAt(i)));
+ }
+ }
+ build.append("]");
+ return build.toString();
+ }
}
diff --git a/paimon-python/pypaimon/schema/data_types.py
b/paimon-python/pypaimon/schema/data_types.py
index 51787c7dcb..318ddfe02f 100755
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -472,6 +472,12 @@ class PyarrowFieldParser:
key_type = PyarrowFieldParser.from_paimon_type(data_type.key)
value_type = PyarrowFieldParser.from_paimon_type(data_type.value)
return pyarrow.map_(key_type, value_type)
+ elif isinstance(data_type, RowType):
+ pa_fields = []
+ for field in data_type.fields:
+ pa_field_type = PyarrowFieldParser.from_paimon_type(field.type)
+ pa_fields.append(pyarrow.field(field.name, pa_field_type,
nullable=field.type.nullable))
+ return pyarrow.struct(pa_fields)
raise ValueError("Unsupported data type: {}".format(data_type))
@staticmethod
@@ -539,6 +545,17 @@ class PyarrowFieldParser:
key_type = PyarrowFieldParser.to_paimon_type(pa_type.key_type,
nullable)
value_type = PyarrowFieldParser.to_paimon_type(pa_type.item_type,
nullable)
return MapType(nullable, key_type, value_type)
+ elif types.is_struct(pa_type):
+ pa_type: pyarrow.StructType
+ fields = []
+ for i, pa_field in enumerate(pa_type):
+ field_type = PyarrowFieldParser.to_paimon_type(pa_field.type,
pa_field.nullable)
+ fields.append(DataField(
+ id=i,
+ name=pa_field.name,
+ type=field_type
+ ))
+ return RowType(nullable, fields)
if type_name is not None:
return AtomicType(type_name, nullable)
raise ValueError("Unsupported pyarrow type: {}".format(pa_type))
diff --git a/paimon-python/pypaimon/tests/data_types_test.py
b/paimon-python/pypaimon/tests/data_types_test.py
index 53644e24c5..6ace872997 100755
--- a/paimon-python/pypaimon/tests/data_types_test.py
+++ b/paimon-python/pypaimon/tests/data_types_test.py
@@ -17,8 +17,10 @@ limitations under the License.
"""
import unittest
from parameterized import parameterized
+import pyarrow as pa
-from pypaimon.schema.data_types import DataField, AtomicType, ArrayType,
MultisetType, MapType, RowType
+from pypaimon.schema.data_types import (DataField, AtomicType, ArrayType,
MultisetType, MapType,
+ RowType, PyarrowFieldParser)
class DataTypesTest(unittest.TestCase):
@@ -65,3 +67,70 @@ class DataTypesTest(unittest.TestCase):
DataField(1, "b",
AtomicType("TIMESTAMP(6)"),)])
self.assertEqual(str(row_data),
str(RowType.from_dict(row_data.to_dict())))
+
+ def test_struct_from_paimon_to_pyarrow(self):
+ paimon_row = RowType(
+ nullable=True,
+ fields=[
+ DataField(0, "field1", AtomicType("INT")),
+ DataField(1, "field2", AtomicType("STRING")),
+ DataField(2, "field3", AtomicType("DOUBLE"))
+ ]
+ )
+ pa_struct = PyarrowFieldParser.from_paimon_type(paimon_row)
+
+ self.assertTrue(pa.types.is_struct(pa_struct))
+ self.assertEqual(len(pa_struct), 3)
+ self.assertEqual(pa_struct[0].name, "field1")
+ self.assertEqual(pa_struct[1].name, "field2")
+ self.assertEqual(pa_struct[2].name, "field3")
+ self.assertTrue(pa.types.is_int32(pa_struct[0].type))
+ self.assertTrue(pa.types.is_string(pa_struct[1].type))
+ self.assertTrue(pa.types.is_float64(pa_struct[2].type))
+
+ def test_struct_from_pyarrow_to_paimon(self):
+ pa_struct = pa.struct([
+ pa.field("name", pa.string()),
+ pa.field("age", pa.int32()),
+ pa.field("score", pa.float64())
+ ])
+ paimon_row = PyarrowFieldParser.to_paimon_type(pa_struct,
nullable=True)
+
+ self.assertIsInstance(paimon_row, RowType)
+ self.assertTrue(paimon_row.nullable)
+ self.assertEqual(len(paimon_row.fields), 3)
+ self.assertEqual(paimon_row.fields[0].name, "name")
+ self.assertEqual(paimon_row.fields[1].name, "age")
+ self.assertEqual(paimon_row.fields[2].name, "score")
+ self.assertEqual(paimon_row.fields[0].type.type, "STRING")
+ self.assertEqual(paimon_row.fields[1].type.type, "INT")
+ self.assertEqual(paimon_row.fields[2].type.type, "DOUBLE")
+
+ def test_nested_field_roundtrip(self):
+ nested_field = RowType(
+ nullable=True,
+ fields=[
+ DataField(0, "inner_field1", AtomicType("STRING")),
+ DataField(1, "inner_field2", AtomicType("INT"))
+ ]
+ )
+ paimon_row = RowType(
+ nullable=True,
+ fields=[
+ DataField(0, "outer_field1", AtomicType("BIGINT")),
+ DataField(1, "nested", nested_field)
+ ]
+ )
+ pa_struct = PyarrowFieldParser.from_paimon_type(paimon_row)
+
+ converted_paimon_row = PyarrowFieldParser.to_paimon_type(pa_struct,
nullable=True)
+ self.assertIsInstance(converted_paimon_row, RowType)
+ self.assertEqual(len(converted_paimon_row.fields), 2)
+ self.assertEqual(converted_paimon_row.fields[0].name, "outer_field1")
+ self.assertEqual(converted_paimon_row.fields[1].name, "nested")
+
+ converted_nested_field = converted_paimon_row.fields[1].type
+ self.assertIsInstance(converted_nested_field, RowType)
+ self.assertEqual(len(converted_nested_field.fields), 2)
+ self.assertEqual(converted_nested_field.fields[0].name, "inner_field1")
+ self.assertEqual(converted_nested_field.fields[1].name, "inner_field2")
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 71e1fbb217..b56ced657c 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -127,7 +127,15 @@ class JavaPyReadWriteTest(unittest.TestCase):
('category', pa.string()),
('value', pa.float64()),
('ts', pa.timestamp('us')),
- ('ts_ltz', pa.timestamp('us', tz='UTC'))
+ ('ts_ltz', pa.timestamp('us', tz='UTC')),
+ ('metadata', pa.struct([
+ pa.field('source', pa.string()),
+ pa.field('created_at', pa.int64()),
+ pa.field('location', pa.struct([
+ pa.field('city', pa.string()),
+ pa.field('country', pa.string())
+ ]))
+ ]))
])
table_name = f'default.mixed_test_pk_tablep_{file_format}'
@@ -169,7 +177,15 @@ class JavaPyReadWriteTest(unittest.TestCase):
'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable',
'Meat', 'Meat'],
'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0],
'ts': pd.to_datetime([1000000, 1000001, 1000002, 1000003,
1000004, 1000005], unit='ms'),
- 'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003,
2000004, 2000005], unit='ms', utc=True)
+ 'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003,
2000004, 2000005], unit='ms', utc=True),
+ 'metadata': [
+ {'source': 'store1', 'created_at': 1001, 'location':
{'city': 'Beijing', 'country': 'China'}},
+ {'source': 'store1', 'created_at': 1002, 'location':
{'city': 'Shanghai', 'country': 'China'}},
+ {'source': 'store2', 'created_at': 1003, 'location':
{'city': 'Tokyo', 'country': 'Japan'}},
+ {'source': 'store2', 'created_at': 1004, 'location':
{'city': 'Seoul', 'country': 'Korea'}},
+ {'source': 'store3', 'created_at': 1005, 'location':
{'city': 'NewYork', 'country': 'USA'}},
+ {'source': 'store3', 'created_at': 1006, 'location':
{'city': 'London', 'country': 'UK'}}
+ ]
})
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
@@ -210,11 +226,24 @@ class JavaPyReadWriteTest(unittest.TestCase):
if file_format != "lance":
self.assertEqual(table.fields[4].type.type, "TIMESTAMP(6)")
self.assertEqual(table.fields[5].type.type, "TIMESTAMP(6) WITH
LOCAL TIME ZONE")
+ from pypaimon.schema.data_types import RowType
+ self.assertIsInstance(table.fields[6].type, RowType)
+ metadata_fields = table.fields[6].type.fields
+ self.assertEqual(len(metadata_fields), 3)
+ self.assertEqual(metadata_fields[0].name, 'source')
+ self.assertEqual(metadata_fields[1].name, 'created_at')
+ self.assertEqual(metadata_fields[2].name, 'location')
+ self.assertIsInstance(metadata_fields[2].type, RowType)
+
# Data order may vary due to partitioning/bucketing, so compare as sets
expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken',
'Beef'}
actual_names = set(res['name'].tolist())
self.assertEqual(actual_names, expected_names)
+ # Verify metadata column can be read and contains nested structures
+ if 'metadata' in res.columns:
+ self.assertFalse(res['metadata'].isnull().all())
+
# For primary key tables, verify that _VALUE_KIND is written correctly
# by checking if we can read the raw data with system fields
# Note: Normal read filters out system fields, so we verify through
Java read