This is an automated email from the ASF dual-hosted git repository.

yunfengzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ae7a19f416 [flink] Verify Read & Write Shredding Variant (#6623)
ae7a19f416 is described below

commit ae7a19f416a53381219c9285e9f3623a85408961
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Wed Nov 19 10:34:39 2025 +0800

    [flink] Verify Read & Write Shredding Variant (#6623)
---
 .../apache/paimon/flink/CatalogTableITCase.java    | 38 ++++++++++++++++++++++
 1 file changed, 38 insertions(+)

diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 3d7429177a..8c172a95f4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
 import org.apache.flink.types.variant.Variant;
 import org.apache.flink.types.variant.VariantBuilder;
 import org.junit.jupiter.api.Test;
@@ -40,12 +41,14 @@ import org.junit.jupiter.api.condition.EnabledIf;
 import javax.annotation.Nonnull;
 
 import java.util.HashMap;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.stream.Collectors;
 
 import static 
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
+import static org.apache.flink.types.RowUtils.createRowWithNamedPositions;
 import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
 import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
 import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
@@ -1260,6 +1263,41 @@ public class CatalogTableITCase extends 
CatalogITCaseBase {
                         Row.of(builder.of("hello")));
     }
 
+    @Test
+    @EnabledIf("isFlink2_1OrAbove")
+    void testReadWriteShreddingVariant() {
+        sql(
+                "CREATE TABLE t (v VARIANT) WITH ("
+                        + "'parquet.variant.shreddingSchema' =\n"
+                        + "'{\"type\":\"ROW\",\"fields\":["
+                        + "   {\"name\":\"v\",\"type\":"
+                        + "       {\"type\":\"ROW\",\"fields\":["
+                        + "           {\"name\":\"age\",\"type\":\"INT\"},"
+                        + "           {\"name\":\"city\",\"type\":\"STRING\"}]"
+                        + "       }"
+                        + "   }]"
+                        + "}'"
+                        + ")");
+
+        sql(
+                "INSERT INTO t SELECT PARSE_JSON(s) FROM (VALUES 
('{\"age\":27,\"city\":\"Beijing\"}')) AS T(s)");
+
+        List<Row> rows = sql("SELECT * FROM t");
+
+        VariantBuilder builder = Variant.newBuilder();
+        Variant expectedVariant =
+                builder.object()
+                        .add("age", builder.of((byte) 27))
+                        .add("city", builder.of("Beijing"))
+                        .build();
+        LinkedHashMap<String, Integer> positionByNames = new LinkedHashMap<>();
+        positionByNames.put("v", 0);
+        Row expectedRow =
+                createRowWithNamedPositions(
+                        RowKind.INSERT, new Object[] {expectedVariant}, 
positionByNames);
+        assertThat(rows).containsExactlyInAnyOrder(expectedRow);
+    }
+
     private void innerTestReadOptimizedTableAndCheckData(String 
insertTableName) {
         // full compaction will always be performed at the end of batch jobs, 
as long as
         // full-compaction.delta-commits is set, regardless of its value

Reply via email to