This is an automated email from the ASF dual-hosted git repository.
yunfengzhou pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ae7a19f416 [flink] Verify Read & Write Shredding Variant (#6623)
ae7a19f416 is described below
commit ae7a19f416a53381219c9285e9f3623a85408961
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Wed Nov 19 10:34:39 2025 +0800
[flink] Verify Read & Write Shredding Variant (#6623)
---
.../apache/paimon/flink/CatalogTableITCase.java | 38 ++++++++++++++++++++++
1 file changed, 38 insertions(+)
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
index 3d7429177a..8c172a95f4 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/CatalogTableITCase.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.PartitionNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
import org.apache.flink.types.variant.Variant;
import org.apache.flink.types.variant.VariantBuilder;
import org.junit.jupiter.api.Test;
@@ -40,12 +41,14 @@ import org.junit.jupiter.api.condition.EnabledIf;
import javax.annotation.Nonnull;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import static
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
+import static org.apache.flink.types.RowUtils.createRowWithNamedPositions;
import static org.apache.paimon.catalog.Catalog.LAST_UPDATE_TIME_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_FILES_PROP;
import static org.apache.paimon.catalog.Catalog.NUM_ROWS_PROP;
@@ -1260,6 +1263,41 @@ public class CatalogTableITCase extends
CatalogITCaseBase {
Row.of(builder.of("hello")));
}
+ @Test
+ @EnabledIf("isFlink2_1OrAbove")
+ void testReadWriteShreddingVariant() {
+ sql(
+ "CREATE TABLE t (v VARIANT) WITH ("
+ + "'parquet.variant.shreddingSchema' =\n"
+ + "'{\"type\":\"ROW\",\"fields\":["
+ + " {\"name\":\"v\",\"type\":"
+ + " {\"type\":\"ROW\",\"fields\":["
+ + " {\"name\":\"age\",\"type\":\"INT\"},"
+ + " {\"name\":\"city\",\"type\":\"STRING\"}]"
+ + " }"
+ + " }]"
+ + "}'"
+ + ")");
+
+ sql(
+ "INSERT INTO t SELECT PARSE_JSON(s) FROM (VALUES
('{\"age\":27,\"city\":\"Beijing\"}')) AS T(s)");
+
+ List<Row> rows = sql("SELECT * FROM t");
+
+ VariantBuilder builder = Variant.newBuilder();
+ Variant expectedVariant =
+ builder.object()
+ .add("age", builder.of((byte) 27))
+ .add("city", builder.of("Beijing"))
+ .build();
+ LinkedHashMap<String, Integer> positionByNames = new LinkedHashMap<>();
+ positionByNames.put("v", 0);
+ Row expectedRow =
+ createRowWithNamedPositions(
+ RowKind.INSERT, new Object[] {expectedVariant},
positionByNames);
+ assertThat(rows).containsExactlyInAnyOrder(expectedRow);
+ }
+
private void innerTestReadOptimizedTableAndCheckData(String
insertTableName) {
// full compaction will always be performed at the end of batch jobs,
as long as
// full-compaction.delta-commits is set, regardless of its value