This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch release-2.3
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-2.3 by this push:
new 0ab06caa26c [FLINK-39504][table-planner] Handle special chars in
VARIANT ITEM call
0ab06caa26c is described below
commit 0ab06caa26ca6b429d3277fa9a573dbfc8f5de69
Author: Timo Walther <[email protected]>
AuthorDate: Tue Apr 21 16:13:25 2026 +0200
[FLINK-39504][table-planner] Handle special chars in VARIANT ITEM call
This closes #27985.
---
.../planner/codegen/calls/ScalarOperatorGens.scala | 7 +-
.../nodes/exec/stream/VariantSemanticTest.java | 405 ++++++++++-----------
2 files changed, 198 insertions(+), 214 deletions(-)
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
index dff256101ac..28c0e06ca97 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/ScalarOperatorGens.scala
@@ -17,7 +17,7 @@
*/
package org.apache.flink.table.planner.codegen.calls
-import org.apache.flink.table.api.ValidationException
+import org.apache.flink.table.api.{TableRuntimeException, ValidationException}
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryStringData}
import org.apache.flink.table.data.util.MapDataUtil
@@ -1740,16 +1740,17 @@ object ScalarOperatorGens {
resultTerm: String,
nullTerm: String,
tmpValue: String): String = {
+ val escapedFieldName = EncodingUtils.escapeJava(fieldName)
s"""
| if ($variantTerm.isObject()){
- | $variantTypeTerm $tmpValue = $variantTerm.getField("$fieldName");
+ | $variantTypeTerm $tmpValue =
$variantTerm.getField("$escapedFieldName");
| if ($tmpValue == null) {
| $nullTerm = true;
| } else {
| $resultTerm = $tmpValue;
| }
| } else {
- | throw new org.apache.flink.table.api.TableRuntimeException("String
key access on variant requires an object variant, but a non-object variant was
provided.");
+ | throw new ${className[TableRuntimeException]}("String key access
on variant requires an object variant, but a non-object variant was provided.");
| }
""".stripMargin
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VariantSemanticTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VariantSemanticTest.java
index 1aa4ad1af0f..af95ee23a65 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VariantSemanticTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/VariantSemanticTest.java
@@ -111,67 +111,58 @@ public class VariantSemanticTest extends SemanticTestBase
{
.runSql("INSERT INTO sink_t SELECT TRY_PARSE_JSON(v) FROM
t")
.build();
- static final TableTestProgram BUILTIN_AGG_WITH_RETRACTION;
-
- static final TableTestProgram BUILTIN_AGG;
-
- static {
- Variant v1 = BUILDER.of(1);
- Variant v2 = BUILDER.of(2);
-
- BUILTIN_AGG =
- TableTestProgram.of("builtin-agg", "validates builtin agg")
- .setupTableSource(
- SourceTestStep.newBuilder("t")
- .addSchema("v VARIANT")
- .producedValues(Row.of(v1),
Row.of(v2), Row.of(v2))
- .build())
- .setupTableSink(
- SinkTestStep.newBuilder("sink_t")
- .addSchema(
- "fv VARIANT", "lv VARIANT", "c
BIGINT", "dc BIGINT")
- .consumedValues(
- Row.of(v1, v1, 1, 1),
-
Row.ofKind(RowKind.UPDATE_BEFORE, v1, v1, 1, 1),
-
Row.ofKind(RowKind.UPDATE_AFTER, v1, v2, 2, 2),
-
Row.ofKind(RowKind.UPDATE_BEFORE, v1, v2, 2, 2),
-
Row.ofKind(RowKind.UPDATE_AFTER, v1, v2, 3, 2))
- .build())
- .runSql(
- "INSERT INTO sink_t SELECT FIRST_VALUE(v),
LAST_VALUE(v), COUNT(v), COUNT(DISTINCT v) FROM t")
- .build();
-
- BUILTIN_AGG_WITH_RETRACTION =
- TableTestProgram.of(
- "builtin-agg-with-retraction",
- "validates builtin agg with retraction")
- .setupTableSource(
- SourceTestStep.newBuilder("t")
- .addSchema("v VARIANT")
- .addOption("changelog-mode",
"I,UB,UA,D")
- .producedValues(
- Row.of(v1),
- Row.of(v2),
- Row.of(v2),
- Row.ofKind(RowKind.DELETE, v1))
- .build())
- .setupTableSink(
- SinkTestStep.newBuilder("sink_t")
- .addSchema(
- "fv VARIANT", "lv VARIANT", "c
BIGINT", "dc BIGINT")
- .consumedValues(
- Row.of(v1, v1, 1, 1),
-
Row.ofKind(RowKind.UPDATE_BEFORE, v1, v1, 1, 1),
-
Row.ofKind(RowKind.UPDATE_AFTER, v1, v2, 2, 2),
-
Row.ofKind(RowKind.UPDATE_BEFORE, v1, v2, 2, 2),
-
Row.ofKind(RowKind.UPDATE_AFTER, v1, v2, 3, 2),
-
Row.ofKind(RowKind.UPDATE_BEFORE, v1, v2, 3, 2),
-
Row.ofKind(RowKind.UPDATE_AFTER, v2, v2, 2, 1))
- .build())
- .runSql(
- "INSERT INTO sink_t SELECT FIRST_VALUE(v),
LAST_VALUE(v), COUNT(v), COUNT(DISTINCT v) FROM t")
- .build();
- }
+ static final Variant V1 = BUILDER.of(1);
+ static final Variant V2 = BUILDER.of(2);
+
+ static final TableTestProgram BUILTIN_AGG =
+ TableTestProgram.of("builtin-agg", "validates builtin agg")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema("v VARIANT")
+ .producedValues(Row.of(V1), Row.of(V2),
Row.of(V2))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("fv VARIANT", "lv VARIANT", "c
BIGINT", "dc BIGINT")
+ .consumedValues(
+ Row.of(V1, V1, 1, 1),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
V1, V1, 1, 1),
+ Row.ofKind(RowKind.UPDATE_AFTER,
V1, V2, 2, 2),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
V1, V2, 2, 2),
+ Row.ofKind(RowKind.UPDATE_AFTER,
V1, V2, 3, 2))
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT FIRST_VALUE(v),
LAST_VALUE(v), COUNT(v), COUNT(DISTINCT v) FROM t")
+ .build();
+
+ static final TableTestProgram BUILTIN_AGG_WITH_RETRACTION =
+ TableTestProgram.of(
+ "builtin-agg-with-retraction", "validates builtin
agg with retraction")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema("v VARIANT")
+ .addOption("changelog-mode", "I,UB,UA,D")
+ .producedValues(
+ Row.of(V1),
+ Row.of(V2),
+ Row.of(V2),
+ Row.ofKind(RowKind.DELETE, V1))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema("fv VARIANT", "lv VARIANT", "c
BIGINT", "dc BIGINT")
+ .consumedValues(
+ Row.of(V1, V1, 1, 1),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
V1, V1, 1, 1),
+ Row.ofKind(RowKind.UPDATE_AFTER,
V1, V2, 2, 2),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
V1, V2, 2, 2),
+ Row.ofKind(RowKind.UPDATE_AFTER,
V1, V2, 3, 2),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
V1, V2, 3, 2),
+ Row.ofKind(RowKind.UPDATE_AFTER,
V2, V2, 2, 1))
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT FIRST_VALUE(v),
LAST_VALUE(v), COUNT(v), COUNT(DISTINCT v) FROM t")
+ .build();
static final TableTestProgram VARIANT_AS_UDF_ARG =
TableTestProgram.of("variant-as-udf-arg", "validates variant as
udf argument")
@@ -256,18 +247,6 @@ public class VariantSemanticTest extends SemanticTestBase {
.runSql("INSERT INTO sink_t SELECT k, SUM(v) AS total FROM
t GROUP BY k")
.build();
- static final TableTestProgram VARIANT_ARRAY_ACCESS;
-
- static final TableTestProgram
VARIANT_ARRAY_ACCESS_WITH_DIFFERENT_INDEX_TYPES;
-
- static final TableTestProgram VARIANT_OBJECT_ACCESS;
-
- static final TableTestProgram VARIANT_NESTED_ACCESS;
-
- static final TableTestProgram VARIANT_ARRAY_ERROR_ACCESS;
-
- static final TableTestProgram VARIANT_OBJECT_ERROR_ACCESS;
-
public static final SourceTestStep VARIANT_ARRAY_SOURCE =
SourceTestStep.newBuilder("t")
.addSchema("v VARIANT")
@@ -322,148 +301,152 @@ public class VariantSemanticTest extends
SemanticTestBase {
new Row(3))
.build();
- public static final SourceTestStep VARIANT_NESTED_SOURCE =
- SourceTestStep.newBuilder("t")
- .addSchema("v VARIANT")
- .producedValues(
- Row.of(
- BUILDER.object()
- .add(
- "users",
- BUILDER.array()
- .add(
-
BUILDER.object()
-
.add(
-
"id",
-
BUILDER.of(1))
-
.add(
-
"name",
-
BUILDER.of(
-
"Alice"))
-
.build())
- .build())
- .build()),
- new Row(1))
+ static final TableTestProgram VARIANT_ARRAY_ACCESS =
+ TableTestProgram.of(
+ "variant-array-access",
+ "validates variant array access using [] operator
in sql and at() in table api")
+ .setupTableSource(VARIANT_ARRAY_SOURCE)
+ .setupTableSink(VARIANT_ARRAY_SINK)
+ .runSql("INSERT INTO sink_t SELECT v[1], v[2], v[3] FROM
t")
+ .runTableApi(
+ t ->
+ t.from("t")
+ .select(
+ $("v").at(1).as("v1"),
+ $("v").at(2).as("v2"),
+ $("v").at(3).as("v3")),
+ "sink_t")
.build();
- public static final SinkTestStep VARIANT_NESTED_SINK =
- SinkTestStep.newBuilder("sink_t")
- .addSchema("user_id VARIANT", "user_name VARIANT")
- .consumedValues(Row.of(BUILDER.of(1),
BUILDER.of("Alice")), new Row(2))
+ static final TableTestProgram
VARIANT_ARRAY_ACCESS_WITH_DIFFERENT_INDEX_TYPES =
+ TableTestProgram.of(
+ "variant-array-access-with-different-index-types",
+ "validates variant array access with different
index types using [] operator in sql and at() in table api")
+ .setupTableSource(VARIANT_ARRAY_SOURCE)
+ .setupTableSink(VARIANT_ARRAY_SINK)
+ .runSql(
+ "INSERT INTO sink_t SELECT v[cast(1 as tinyint)],
v[cast(2 as smallint)], v[cast(4 as bigint)] FROM t")
+ .runTableApi(
+ t ->
+ t.from("t")
+ .select(
+ $("v").at((byte)
1).as("v1"),
+ $("v").at((short)
2).as("v2"),
+ $("v").at((long)
3).as("v3")),
+ "sink_t")
.build();
- static {
- VARIANT_ARRAY_ACCESS =
- TableTestProgram.of(
- "variant-array-access",
- "validates variant array access using []
operator in sql and at() in table api")
- .setupTableSource(VARIANT_ARRAY_SOURCE)
- .setupTableSink(VARIANT_ARRAY_SINK)
- .runSql("INSERT INTO sink_t SELECT v[1], v[2], v[3]
FROM t")
- .runTableApi(
- t ->
- t.from("t")
- .select(
- $("v").at(1).as("v1"),
- $("v").at(2).as("v2"),
- $("v").at(3).as("v3")),
- "sink_t")
- .build();
-
- VARIANT_ARRAY_ACCESS_WITH_DIFFERENT_INDEX_TYPES =
- TableTestProgram.of(
-
"variant-array-access-with-different-index-types",
- "validates variant array access with different
index types using [] operator in sql and at() in table api")
- .setupTableSource(VARIANT_ARRAY_SOURCE)
- .setupTableSink(VARIANT_ARRAY_SINK)
- .runSql(
- "INSERT INTO sink_t SELECT v[cast(1 as
tinyint)], v[cast(2 as smallint)], v[cast(4 as bigint)] FROM t")
- .runTableApi(
- t ->
- t.from("t")
- .select(
- $("v").at((byte)
1).as("v1"),
- $("v").at((short)
2).as("v2"),
- $("v").at((long)
3).as("v3")),
- "sink_t")
- .build();
-
- VARIANT_OBJECT_ACCESS =
- TableTestProgram.of(
- "variant-object-access",
- "validates variant object field access using
[] operator in sql and at() in table api")
- .setupTableSource(VARIANT_OBJECT_SOURCE)
- .setupTableSink(VARIANT_OBJECT_SINK)
- .runSql("INSERT INTO sink_t SELECT v['name'],
v['age'], v['city'] FROM t")
- .runTableApi(
- t ->
- t.from("t")
- .select(
-
$("v").at("name").as("name"),
-
$("v").at("age").as("age"),
-
$("v").at("city").as("city")),
- "sink_t")
- .build();
-
- VARIANT_NESTED_ACCESS =
- TableTestProgram.of(
- "variant-nested-access",
- "validates variant nested access using []
operator in sql and at() in table api")
- .setupTableSource(VARIANT_NESTED_SOURCE)
- .setupTableSink(VARIANT_NESTED_SINK)
- .runSql(
- "INSERT INTO sink_t SELECT
v['users'][1]['id'], v['users'][1]['name'] FROM t")
- .runTableApi(
- t ->
- t.from("t")
- .select(
- $("v").at("users")
- .at(1)
- .at("id")
- .as("user_id"),
- $("v").at("users")
- .at(1)
- .at("name")
-
.as("user_name")),
- "sink_t")
- .build();
-
- VARIANT_ARRAY_ERROR_ACCESS =
- TableTestProgram.of(
- "variant-array-error-access",
- "validates variant array access using []
operator in sql and at() in table api with string")
- .setupTableSource(VARIANT_ARRAY_SOURCE)
- .runFailingSql(
- "SELECT v['1'], v['2'], v['3'] FROM t",
- TableRuntimeException.class,
- "String key access on variant requires an
object variant, but a non-object variant was provided.")
- .runFailingSql(
- "SELECT v[1.5], v[4.2], v[3.3] FROM t",
- ValidationException.class,
- "Cannot apply 'ITEM' to arguments of type
'ITEM(<VARIANT>, <DECIMAL(2, 1)>)'. Supported form(s): <ARRAY>[<INTEGER>]\n"
- + "<MAP>[<ANY>]\n"
- + "<ROW>[<CHARACTER>|<INTEGER>]\n"
- + "<VARIANT>[<CHARACTER>|<INTEGER>]")
- .build();
-
- VARIANT_OBJECT_ERROR_ACCESS =
- TableTestProgram.of(
- "variant-object-error-access",
- "validates variant object field access using
[] operator in sql and at() in table api")
- .setupTableSource(VARIANT_OBJECT_SOURCE)
- .runFailingSql(
- "SELECT v[1], v[2], v[3] FROM t",
- TableRuntimeException.class,
- "Integer index access on variant requires an
array variant, but a non-array variant was provided.")
- .runFailingSql(
- "SELECT v[1.5], v[4.2], v[3.3] FROM t",
- ValidationException.class,
- "Cannot apply 'ITEM' to arguments of type
'ITEM(<VARIANT>, <DECIMAL(2, 1)>)'. Supported form(s): <ARRAY>[<INTEGER>]\n"
- + "<MAP>[<ANY>]\n"
- + "<ROW>[<CHARACTER>|<INTEGER>]\n"
- + "<VARIANT>[<CHARACTER>|<INTEGER>]")
- .build();
- }
+ static final TableTestProgram VARIANT_OBJECT_ACCESS =
+ TableTestProgram.of(
+ "variant-object-access",
+ "validates variant object field access using []
operator in sql and at() in table api")
+ .setupTableSource(VARIANT_OBJECT_SOURCE)
+ .setupTableSink(VARIANT_OBJECT_SINK)
+ .runSql("INSERT INTO sink_t SELECT v['name'], v['age'],
v['city'] FROM t")
+ .runTableApi(
+ t ->
+ t.from("t")
+ .select(
+
$("v").at("name").as("name"),
+ $("v").at("age").as("age"),
+
$("v").at("city").as("city")),
+ "sink_t")
+ .build();
+
+ static final Variant COMPLEX_VARIANT =
+ BUILDER.object()
+ .add(
+ "users",
+ BUILDER.array()
+ .add(
+ BUILDER.object()
+ .add("id", BUILDER.of(1))
+ .add("name",
BUILDER.of("Alice"))
+ .add(
+ "special ' \" \\
\u0000",
+ BUILDER.of("Bob"))
+ .build())
+ .build())
+ .build();
+
+ static final TableTestProgram VARIANT_NESTED_ACCESS =
+ TableTestProgram.of(
+ "variant-nested-access",
+ "validates variant nested access using [] operator
in sql and at() in table api")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema("v VARIANT")
+ .producedValues(Row.of(COMPLEX_VARIANT),
new Row(1))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(
+ "user_id VARIANT",
+ "user_name VARIANT",
+ "special_name VARIANT")
+ .consumedValues(
+ Row.of(
+ BUILDER.of(1),
+ BUILDER.of("Alice"),
+ BUILDER.of("Bob")),
+ new Row(3))
+ .build())
+ .runSql(
+ "INSERT INTO sink_t SELECT "
+ + "v['users'][1]['id'], "
+ + "v['users'][1]['name'], "
+ + "v['users'][1]['special '' \" \\
\u0000'] FROM t")
+ .runTableApi(
+ t ->
+ t.from("t")
+ .select(
+
$("v").at("users").at(1).at("id").as("user_id"),
+ $("v").at("users")
+ .at(1)
+ .at("name")
+ .as("user_name"),
+ $("v").at("users")
+ .at(1)
+ .at("special ' \"
\\ \u0000")
+
.as("special_name")),
+ "sink_t")
+ .build();
+
+ static final TableTestProgram VARIANT_ARRAY_ERROR_ACCESS =
+ TableTestProgram.of(
+ "variant-array-error-access",
+ "validates variant array access using [] operator
in sql and at() in table api with string")
+ .setupTableSource(VARIANT_ARRAY_SOURCE)
+ .runFailingSql(
+ "SELECT v['1'], v['2'], v['3'] FROM t",
+ TableRuntimeException.class,
+ "String key access on variant requires an object
variant, but a non-object variant was provided.")
+ .runFailingSql(
+ "SELECT v[1.5], v[4.2], v[3.3] FROM t",
+ ValidationException.class,
+ "Cannot apply 'ITEM' to arguments of type
'ITEM(<VARIANT>, <DECIMAL(2, 1)>)'. Supported form(s): <ARRAY>[<INTEGER>]\n"
+ + "<MAP>[<ANY>]\n"
+ + "<ROW>[<CHARACTER>|<INTEGER>]\n"
+ + "<VARIANT>[<CHARACTER>|<INTEGER>]")
+ .build();
+
+ static final TableTestProgram VARIANT_OBJECT_ERROR_ACCESS =
+ TableTestProgram.of(
+ "variant-object-error-access",
+ "validates variant object field access using []
operator in sql and at() in table api")
+ .setupTableSource(VARIANT_OBJECT_SOURCE)
+ .runFailingSql(
+ "SELECT v[1], v[2], v[3] FROM t",
+ TableRuntimeException.class,
+ "Integer index access on variant requires an array
variant, but a non-array variant was provided.")
+ .runFailingSql(
+ "SELECT v[1.5], v[4.2], v[3.3] FROM t",
+ ValidationException.class,
+ "Cannot apply 'ITEM' to arguments of type
'ITEM(<VARIANT>, <DECIMAL(2, 1)>)'. Supported form(s): <ARRAY>[<INTEGER>]\n"
+ + "<MAP>[<ANY>]\n"
+ + "<ROW>[<CHARACTER>|<INTEGER>]\n"
+ + "<VARIANT>[<CHARACTER>|<INTEGER>]")
+ .build();
@Override
public List<TableTestProgram> programs() {