This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-2.3 by this push:
     new 0ab06caa26c [FLINK-39504][table-planner] Handle special chars in 
VARIANT ITEM call
0ab06caa26c is described below

commit 0ab06caa26ca6b429d3277fa9a573dbfc8f5de69
Author: Timo Walther <[email protected]>
AuthorDate: Tue Apr 21 16:13:25 2026 +0200

    [FLINK-39504][table-planner] Handle special chars in VARIANT ITEM call
    
    This closes #27985.
---
 .../planner/codegen/calls/ScalarOperatorGens.scala |   7 +-
 .../nodes/exec/stream/VariantSemanticTest.java     | 405 ++++++++++-----------
 2 files changed, 198 insertions(+), 214 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index dff256101ac..28c0e06ca97 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.table.planner.codegen.calls
 
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{TableRuntimeException, ValidationException}
 import org.apache.flink.table.api.config.ExecutionConfigOptions
 import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryStringData}
 import org.apache.flink.table.data.util.MapDataUtil
@@ -1740,16 +1740,17 @@ object ScalarOperatorGens {
       resultTerm: String,
       nullTerm: String,
       tmpValue: String): String = {
+    val escapedFieldName = EncodingUtils.escapeJava(fieldName)
     s"""
        |  if ($variantTerm.isObject()){
-       |    $variantTypeTerm $tmpValue = $variantTerm.getField("$fieldName");
+       |    $variantTypeTerm $tmpValue = 
$variantTerm.getField("$escapedFieldName");
        |    if ($tmpValue == null) {
        |      $nullTerm = true;
        |    } else {
        |      $resultTerm = $tmpValue;
        |    }
        |  } else {
-       |    throw new org.apache.flink.table.api.TableRuntimeException("String 
key access on variant requires an object variant, but a non-object variant was 
provided.");
+       |    throw new ${className[TableRuntimeException]}("String key access 
on variant requires an object variant, but a non-object variant was provided.");
        |  }
     """.stripMargin
   }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VariantSemanticTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VariantSemanticTest.java
index 1aa4ad1af0f..af95ee23a65 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VariantSemanticTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VariantSemanticTest.java
@@ -111,67 +111,58 @@ public class VariantSemanticTest extends SemanticTestBase 
{
                     .runSql("INSERT INTO sink_t SELECT TRY_PARSE_JSON(v) FROM 
t")
                     .build();
 
-    static final TableTestProgram BUILTIN_AGG_WITH_RETRACTION;
-
-    static final TableTestProgram BUILTIN_AGG;
-
-    static {
-        Variant v1 = BUILDER.of(1);
-        Variant v2 = BUILDER.of(2);
-
-        BUILTIN_AGG =
-                TableTestProgram.of("builtin-agg", "validates builtin agg")
-                        .setupTableSource(
-                                SourceTestStep.newBuilder("t")
-                                        .addSchema("v VARIANT")
-                                        .producedValues(Row.of(v1), 
Row.of(v2), Row.of(v2))
-                                        .build())
-                        .setupTableSink(
-                                SinkTestStep.newBuilder("sink_t")
-                                        .addSchema(
-                                                "fv VARIANT", "lv VARIANT", "c 
BIGINT", "dc BIGINT")
-                                        .consumedValues(
-                                                Row.of(v1, v1, 1, 1),
-                                                
Row.ofKind(RowKind.UPDATE_BEFORE, v1, v1, 1, 1),
-                                                
Row.ofKind(RowKind.UPDATE_AFTER, v1, v2, 2, 2),
-                                                
Row.ofKind(RowKind.UPDATE_BEFORE, v1, v2, 2, 2),
-                                                
Row.ofKind(RowKind.UPDATE_AFTER, v1, v2, 3, 2))
-                                        .build())
-                        .runSql(
-                                "INSERT INTO sink_t SELECT FIRST_VALUE(v), 
LAST_VALUE(v), COUNT(v), COUNT(DISTINCT v) FROM t")
-                        .build();
-
-        BUILTIN_AGG_WITH_RETRACTION =
-                TableTestProgram.of(
-                                "builtin-agg-with-retraction",
-                                "validates builtin agg with retraction")
-                        .setupTableSource(
-                                SourceTestStep.newBuilder("t")
-                                        .addSchema("v VARIANT")
-                                        .addOption("changelog-mode", 
"I,UB,UA,D")
-                                        .producedValues(
-                                                Row.of(v1),
-                                                Row.of(v2),
-                                                Row.of(v2),
-                                                Row.ofKind(RowKind.DELETE, v1))
-                                        .build())
-                        .setupTableSink(
-                                SinkTestStep.newBuilder("sink_t")
-                                        .addSchema(
-                                                "fv VARIANT", "lv VARIANT", "c 
BIGINT", "dc BIGINT")
-                                        .consumedValues(
-                                                Row.of(v1, v1, 1, 1),
-                                                
Row.ofKind(RowKind.UPDATE_BEFORE, v1, v1, 1, 1),
-                                                
Row.ofKind(RowKind.UPDATE_AFTER, v1, v2, 2, 2),
-                                                
Row.ofKind(RowKind.UPDATE_BEFORE, v1, v2, 2, 2),
-                                                
Row.ofKind(RowKind.UPDATE_AFTER, v1, v2, 3, 2),
-                                                
Row.ofKind(RowKind.UPDATE_BEFORE, v1, v2, 3, 2),
-                                                
Row.ofKind(RowKind.UPDATE_AFTER, v2, v2, 2, 1))
-                                        .build())
-                        .runSql(
-                                "INSERT INTO sink_t SELECT FIRST_VALUE(v), 
LAST_VALUE(v), COUNT(v), COUNT(DISTINCT v) FROM t")
-                        .build();
-    }
+    static final Variant V1 = BUILDER.of(1);
+    static final Variant V2 = BUILDER.of(2);
+
+    static final TableTestProgram BUILTIN_AGG =
+            TableTestProgram.of("builtin-agg", "validates builtin agg")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema("v VARIANT")
+                                    .producedValues(Row.of(V1), Row.of(V2), 
Row.of(V2))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("fv VARIANT", "lv VARIANT", "c 
BIGINT", "dc BIGINT")
+                                    .consumedValues(
+                                            Row.of(V1, V1, 1, 1),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
V1, V1, 1, 1),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
V1, V2, 2, 2),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
V1, V2, 2, 2),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
V1, V2, 3, 2))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT FIRST_VALUE(v), 
LAST_VALUE(v), COUNT(v), COUNT(DISTINCT v) FROM t")
+                    .build();
+
+    static final TableTestProgram BUILTIN_AGG_WITH_RETRACTION =
+            TableTestProgram.of(
+                            "builtin-agg-with-retraction", "validates builtin 
agg with retraction")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema("v VARIANT")
+                                    .addOption("changelog-mode", "I,UB,UA,D")
+                                    .producedValues(
+                                            Row.of(V1),
+                                            Row.of(V2),
+                                            Row.of(V2),
+                                            Row.ofKind(RowKind.DELETE, V1))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema("fv VARIANT", "lv VARIANT", "c 
BIGINT", "dc BIGINT")
+                                    .consumedValues(
+                                            Row.of(V1, V1, 1, 1),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
V1, V1, 1, 1),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
V1, V2, 2, 2),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
V1, V2, 2, 2),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
V1, V2, 3, 2),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
V1, V2, 3, 2),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
V2, V2, 2, 1))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT FIRST_VALUE(v), 
LAST_VALUE(v), COUNT(v), COUNT(DISTINCT v) FROM t")
+                    .build();
 
     static final TableTestProgram VARIANT_AS_UDF_ARG =
             TableTestProgram.of("variant-as-udf-arg", "validates variant as 
udf argument")
@@ -256,18 +247,6 @@ public class VariantSemanticTest extends SemanticTestBase {
                     .runSql("INSERT INTO sink_t SELECT k, SUM(v) AS total FROM 
t GROUP BY k")
                     .build();
 
-    static final TableTestProgram VARIANT_ARRAY_ACCESS;
-
-    static final TableTestProgram 
VARIANT_ARRAY_ACCESS_WITH_DIFFERENT_INDEX_TYPES;
-
-    static final TableTestProgram VARIANT_OBJECT_ACCESS;
-
-    static final TableTestProgram VARIANT_NESTED_ACCESS;
-
-    static final TableTestProgram VARIANT_ARRAY_ERROR_ACCESS;
-
-    static final TableTestProgram VARIANT_OBJECT_ERROR_ACCESS;
-
     public static final SourceTestStep VARIANT_ARRAY_SOURCE =
             SourceTestStep.newBuilder("t")
                     .addSchema("v VARIANT")
@@ -322,148 +301,152 @@ public class VariantSemanticTest extends 
SemanticTestBase {
                             new Row(3))
                     .build();
 
-    public static final SourceTestStep VARIANT_NESTED_SOURCE =
-            SourceTestStep.newBuilder("t")
-                    .addSchema("v VARIANT")
-                    .producedValues(
-                            Row.of(
-                                    BUILDER.object()
-                                            .add(
-                                                    "users",
-                                                    BUILDER.array()
-                                                            .add(
-                                                                    
BUILDER.object()
-                                                                            
.add(
-                                                                               
     "id",
-                                                                               
     BUILDER.of(1))
-                                                                            
.add(
-                                                                               
     "name",
-                                                                               
     BUILDER.of(
-                                                                               
             "Alice"))
-                                                                            
.build())
-                                                            .build())
-                                            .build()),
-                            new Row(1))
+    static final TableTestProgram VARIANT_ARRAY_ACCESS =
+            TableTestProgram.of(
+                            "variant-array-access",
+                            "validates variant array access using [] operator 
in sql and at() in table api")
+                    .setupTableSource(VARIANT_ARRAY_SOURCE)
+                    .setupTableSink(VARIANT_ARRAY_SINK)
+                    .runSql("INSERT INTO sink_t SELECT v[1], v[2], v[3] FROM 
t")
+                    .runTableApi(
+                            t ->
+                                    t.from("t")
+                                            .select(
+                                                    $("v").at(1).as("v1"),
+                                                    $("v").at(2).as("v2"),
+                                                    $("v").at(3).as("v3")),
+                            "sink_t")
                     .build();
 
-    public static final SinkTestStep VARIANT_NESTED_SINK =
-            SinkTestStep.newBuilder("sink_t")
-                    .addSchema("user_id VARIANT", "user_name VARIANT")
-                    .consumedValues(Row.of(BUILDER.of(1), 
BUILDER.of("Alice")), new Row(2))
+    static final TableTestProgram 
VARIANT_ARRAY_ACCESS_WITH_DIFFERENT_INDEX_TYPES =
+            TableTestProgram.of(
+                            "variant-array-access-with-different-index-types",
+                            "validates variant array access with different 
index types using [] operator in sql and at() in table api")
+                    .setupTableSource(VARIANT_ARRAY_SOURCE)
+                    .setupTableSink(VARIANT_ARRAY_SINK)
+                    .runSql(
+                            "INSERT INTO sink_t SELECT v[cast(1 as tinyint)], 
v[cast(2 as smallint)], v[cast(4 as bigint)] FROM t")
+                    .runTableApi(
+                            t ->
+                                    t.from("t")
+                                            .select(
+                                                    $("v").at((byte) 
1).as("v1"),
+                                                    $("v").at((short) 
2).as("v2"),
+                                                    $("v").at((long) 
3).as("v3")),
+                            "sink_t")
                     .build();
 
-    static {
-        VARIANT_ARRAY_ACCESS =
-                TableTestProgram.of(
-                                "variant-array-access",
-                                "validates variant array access using [] 
operator in sql and at() in table api")
-                        .setupTableSource(VARIANT_ARRAY_SOURCE)
-                        .setupTableSink(VARIANT_ARRAY_SINK)
-                        .runSql("INSERT INTO sink_t SELECT v[1], v[2], v[3] 
FROM t")
-                        .runTableApi(
-                                t ->
-                                        t.from("t")
-                                                .select(
-                                                        $("v").at(1).as("v1"),
-                                                        $("v").at(2).as("v2"),
-                                                        $("v").at(3).as("v3")),
-                                "sink_t")
-                        .build();
-
-        VARIANT_ARRAY_ACCESS_WITH_DIFFERENT_INDEX_TYPES =
-                TableTestProgram.of(
-                                
"variant-array-access-with-different-index-types",
-                                "validates variant array access with different 
index types using [] operator in sql and at() in table api")
-                        .setupTableSource(VARIANT_ARRAY_SOURCE)
-                        .setupTableSink(VARIANT_ARRAY_SINK)
-                        .runSql(
-                                "INSERT INTO sink_t SELECT v[cast(1 as 
tinyint)], v[cast(2 as smallint)], v[cast(4 as bigint)] FROM t")
-                        .runTableApi(
-                                t ->
-                                        t.from("t")
-                                                .select(
-                                                        $("v").at((byte) 
1).as("v1"),
-                                                        $("v").at((short) 
2).as("v2"),
-                                                        $("v").at((long) 
3).as("v3")),
-                                "sink_t")
-                        .build();
-
-        VARIANT_OBJECT_ACCESS =
-                TableTestProgram.of(
-                                "variant-object-access",
-                                "validates variant object field access using 
[] operator in sql and at() in table api")
-                        .setupTableSource(VARIANT_OBJECT_SOURCE)
-                        .setupTableSink(VARIANT_OBJECT_SINK)
-                        .runSql("INSERT INTO sink_t SELECT v['name'], 
v['age'], v['city'] FROM t")
-                        .runTableApi(
-                                t ->
-                                        t.from("t")
-                                                .select(
-                                                        
$("v").at("name").as("name"),
-                                                        
$("v").at("age").as("age"),
-                                                        
$("v").at("city").as("city")),
-                                "sink_t")
-                        .build();
-
-        VARIANT_NESTED_ACCESS =
-                TableTestProgram.of(
-                                "variant-nested-access",
-                                "validates variant nested access using [] 
operator in sql and at() in table api")
-                        .setupTableSource(VARIANT_NESTED_SOURCE)
-                        .setupTableSink(VARIANT_NESTED_SINK)
-                        .runSql(
-                                "INSERT INTO sink_t SELECT 
v['users'][1]['id'], v['users'][1]['name'] FROM t")
-                        .runTableApi(
-                                t ->
-                                        t.from("t")
-                                                .select(
-                                                        $("v").at("users")
-                                                                .at(1)
-                                                                .at("id")
-                                                                .as("user_id"),
-                                                        $("v").at("users")
-                                                                .at(1)
-                                                                .at("name")
-                                                                
.as("user_name")),
-                                "sink_t")
-                        .build();
-
-        VARIANT_ARRAY_ERROR_ACCESS =
-                TableTestProgram.of(
-                                "variant-array-error-access",
-                                "validates variant array access using [] 
operator in sql and at() in table api with string")
-                        .setupTableSource(VARIANT_ARRAY_SOURCE)
-                        .runFailingSql(
-                                "SELECT v['1'], v['2'], v['3'] FROM t",
-                                TableRuntimeException.class,
-                                "String key access on variant requires an 
object variant, but a non-object variant was provided.")
-                        .runFailingSql(
-                                "SELECT v[1.5], v[4.2], v[3.3] FROM t",
-                                ValidationException.class,
-                                "Cannot apply 'ITEM' to arguments of type 
'ITEM(<VARIANT>, <DECIMAL(2, 1)>)'. Supported form(s): <ARRAY>[<INTEGER>]\n"
-                                        + "<MAP>[<ANY>]\n"
-                                        + "<ROW>[<CHARACTER>|<INTEGER>]\n"
-                                        + "<VARIANT>[<CHARACTER>|<INTEGER>]")
-                        .build();
-
-        VARIANT_OBJECT_ERROR_ACCESS =
-                TableTestProgram.of(
-                                "variant-object-error-access",
-                                "validates variant object field access using 
[] operator in sql and at() in table api")
-                        .setupTableSource(VARIANT_OBJECT_SOURCE)
-                        .runFailingSql(
-                                "SELECT v[1], v[2], v[3] FROM t",
-                                TableRuntimeException.class,
-                                "Integer index access on variant requires an 
array variant, but a non-array variant was provided.")
-                        .runFailingSql(
-                                "SELECT v[1.5], v[4.2], v[3.3] FROM t",
-                                ValidationException.class,
-                                "Cannot apply 'ITEM' to arguments of type 
'ITEM(<VARIANT>, <DECIMAL(2, 1)>)'. Supported form(s): <ARRAY>[<INTEGER>]\n"
-                                        + "<MAP>[<ANY>]\n"
-                                        + "<ROW>[<CHARACTER>|<INTEGER>]\n"
-                                        + "<VARIANT>[<CHARACTER>|<INTEGER>]")
-                        .build();
-    }
+    static final TableTestProgram VARIANT_OBJECT_ACCESS =
+            TableTestProgram.of(
+                            "variant-object-access",
+                            "validates variant object field access using [] 
operator in sql and at() in table api")
+                    .setupTableSource(VARIANT_OBJECT_SOURCE)
+                    .setupTableSink(VARIANT_OBJECT_SINK)
+                    .runSql("INSERT INTO sink_t SELECT v['name'], v['age'], 
v['city'] FROM t")
+                    .runTableApi(
+                            t ->
+                                    t.from("t")
+                                            .select(
+                                                    
$("v").at("name").as("name"),
+                                                    $("v").at("age").as("age"),
+                                                    
$("v").at("city").as("city")),
+                            "sink_t")
+                    .build();
+
+    static final Variant COMPLEX_VARIANT =
+            BUILDER.object()
+                    .add(
+                            "users",
+                            BUILDER.array()
+                                    .add(
+                                            BUILDER.object()
+                                                    .add("id", BUILDER.of(1))
+                                                    .add("name", 
BUILDER.of("Alice"))
+                                                    .add(
+                                                            "special ' \" \\ 
\u0000",
+                                                            BUILDER.of("Bob"))
+                                                    .build())
+                                    .build())
+                    .build();
+
+    static final TableTestProgram VARIANT_NESTED_ACCESS =
+            TableTestProgram.of(
+                            "variant-nested-access",
+                            "validates variant nested access using [] operator 
in sql and at() in table api")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema("v VARIANT")
+                                    .producedValues(Row.of(COMPLEX_VARIANT), 
new Row(1))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(
+                                            "user_id VARIANT",
+                                            "user_name VARIANT",
+                                            "special_name VARIANT")
+                                    .consumedValues(
+                                            Row.of(
+                                                    BUILDER.of(1),
+                                                    BUILDER.of("Alice"),
+                                                    BUILDER.of("Bob")),
+                                            new Row(3))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink_t SELECT "
+                                    + "v['users'][1]['id'], "
+                                    + "v['users'][1]['name'], "
+                                    + "v['users'][1]['special '' \" \\ 
\u0000'] FROM t")
+                    .runTableApi(
+                            t ->
+                                    t.from("t")
+                                            .select(
+                                                    
$("v").at("users").at(1).at("id").as("user_id"),
+                                                    $("v").at("users")
+                                                            .at(1)
+                                                            .at("name")
+                                                            .as("user_name"),
+                                                    $("v").at("users")
+                                                            .at(1)
+                                                            .at("special ' \" 
\\ \u0000")
+                                                            
.as("special_name")),
+                            "sink_t")
+                    .build();
+
+    static final TableTestProgram VARIANT_ARRAY_ERROR_ACCESS =
+            TableTestProgram.of(
+                            "variant-array-error-access",
+                            "validates variant array access using [] operator 
in sql and at() in table api with string")
+                    .setupTableSource(VARIANT_ARRAY_SOURCE)
+                    .runFailingSql(
+                            "SELECT v['1'], v['2'], v['3'] FROM t",
+                            TableRuntimeException.class,
+                            "String key access on variant requires an object 
variant, but a non-object variant was provided.")
+                    .runFailingSql(
+                            "SELECT v[1.5], v[4.2], v[3.3] FROM t",
+                            ValidationException.class,
+                            "Cannot apply 'ITEM' to arguments of type 
'ITEM(<VARIANT>, <DECIMAL(2, 1)>)'. Supported form(s): <ARRAY>[<INTEGER>]\n"
+                                    + "<MAP>[<ANY>]\n"
+                                    + "<ROW>[<CHARACTER>|<INTEGER>]\n"
+                                    + "<VARIANT>[<CHARACTER>|<INTEGER>]")
+                    .build();
+
+    static final TableTestProgram VARIANT_OBJECT_ERROR_ACCESS =
+            TableTestProgram.of(
+                            "variant-object-error-access",
+                            "validates variant object field access using [] 
operator in sql and at() in table api")
+                    .setupTableSource(VARIANT_OBJECT_SOURCE)
+                    .runFailingSql(
+                            "SELECT v[1], v[2], v[3] FROM t",
+                            TableRuntimeException.class,
+                            "Integer index access on variant requires an array 
variant, but a non-array variant was provided.")
+                    .runFailingSql(
+                            "SELECT v[1.5], v[4.2], v[3.3] FROM t",
+                            ValidationException.class,
+                            "Cannot apply 'ITEM' to arguments of type 
'ITEM(<VARIANT>, <DECIMAL(2, 1)>)'. Supported form(s): <ARRAY>[<INTEGER>]\n"
+                                    + "<MAP>[<ANY>]\n"
+                                    + "<ROW>[<CHARACTER>|<INTEGER>]\n"
+                                    + "<VARIANT>[<CHARACTER>|<INTEGER>]")
+                    .build();
 
     @Override
     public List<TableTestProgram> programs() {

Reply via email to