twalthr commented on code in PR #28113:
URL: https://github.com/apache/flink/pull/28113#discussion_r3186789163
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java:
##########
@@ -169,6 +177,70 @@ public static boolean isFunctionKind(SqlOperator operator,
FunctionKind kind) {
return functionDefinition != null && functionDefinition.getKind() ==
kind;
}
+ public static boolean isOneOfFunctionDefinitions(
+ RexNode rexNode, FunctionDefinition... expectedDefinitions) {
+ if (!(rexNode instanceof RexCall)) {
+ return false;
+ }
+ final RexCall call = (RexCall) rexNode;
+ final FunctionDefinition unwrapped = unwrapFunctionDefinition(call);
+ final String operatorName = call.getOperator().getName();
+ for (FunctionDefinition expected : expectedDefinitions) {
+ if (unwrapped != null && unwrapped == expected) {
+ return true;
+ }
+ if (expected instanceof BuiltInFunctionDefinition
+ && ((BuiltInFunctionDefinition) expected)
+ .getName()
+ .equalsIgnoreCase(operatorName)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static RexNode expandLocalRef(RexNode operand, @Nullable
List<RexNode> exprs) {
Review Comment:
This doesn't really fit into ShortcutUtils. ShortcutUtils are meant for
simplifying getter chains. Maybe introduce a `RexUtils`?
##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/ShortcutUtils.java:
##########
@@ -169,6 +177,70 @@ public static boolean isFunctionKind(SqlOperator operator,
FunctionKind kind) {
return functionDefinition != null && functionDefinition.getKind() ==
kind;
}
+ public static boolean isOneOfFunctionDefinitions(
+ RexNode rexNode, FunctionDefinition... expectedDefinitions) {
+ if (!(rexNode instanceof RexCall)) {
+ return false;
+ }
+ final RexCall call = (RexCall) rexNode;
+ final FunctionDefinition unwrapped = unwrapFunctionDefinition(call);
+ final String operatorName = call.getOperator().getName();
+ for (FunctionDefinition expected : expectedDefinitions) {
+ if (unwrapped != null && unwrapped == expected) {
+ return true;
+ }
+ if (expected instanceof BuiltInFunctionDefinition
+ && ((BuiltInFunctionDefinition) expected)
+ .getName()
+ .equalsIgnoreCase(operatorName)) {
Review Comment:
Mapping by name is problematic, it could lead to weird side effects. Can't
we compare enum values or definition constants?
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonStringCallGen.scala:
##########
@@ -23,18 +23,22 @@ import
org.apache.flink.table.planner.codegen.JsonGenerateUtils.createNodeTerm
import org.apache.flink.table.runtime.functions.SqlJsonUtils
import org.apache.flink.table.types.logical.LogicalType
-import org.apache.calcite.rex.RexCall
+import org.apache.calcite.rex.{RexCall, RexProgram}
/** [[CallGenerator]] for `JSON_STRING`. */
-class JsonStringCallGen(call: RexCall) extends CallGenerator {
+class JsonStringCallGen(call: RexCall, rexProgram: RexProgram) extends
CallGenerator {
+
+ def this(call: RexCall) = this(call, null)
Review Comment:
```suggestion
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala:
##########
@@ -487,15 +544,59 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext,
nullableInput: Boolean)
generateNullLiteral(resultType)
// We only support the JSON function inside of JSON_OBJECT or JSON_ARRAY
- case (operand: RexNode, i) if isSupportedJsonOperand(operand, call, i) =>
+ case (operand: RexNode, i)
+ if isSupportedJsonOperand(
+ operand,
+ call,
+ i,
+ if (rexProgram == null) null else rexProgram.getExprList) =>
generateJsonCall(operand)
+ case (o @ _, i) if condIdxs.contains(i) => visitOperandInScopedCache(o)
+
case (o @ _, _) => o.accept(this)
}
generateCallExpression(ctx, call, operands, resultType)
}
+ /**
+ * Indices of `call`'s operands that are NOT unconditionally evaluated at
runtime. Used to scope
+ * the RexLocalRef cache so that bodies cached while visiting these operands
are not hoisted out
+ * of the surrounding short-circuit / if-block.
+ *
+ * - `CASE(when_1, then_1, when_2, then_2, ..., else)`: only `when_1` is
unconditional.
+ * - `AND(a_0, a_1, ..., a_n)` / `OR(...)`: only `a_0` is unconditional;
subsequent operands are
+ * short-circuited by the operator semantics and the codegen.
+ */
+ private def conditionalOperandIndices(call: RexCall): Set[Int] =
call.getKind match {
+ case SqlKind.CASE | SqlKind.AND | SqlKind.OR | SqlKind.COALESCE =>
+ (1 until call.getOperands.size).toSet
+ case _ => Set.empty
+ }
+
+ private def visitOperandInScopedCache(operand: RexNode): GeneratedExpression
= {
+ ctx.pushLocalRefScope()
+ val (operandExpr, scopedBodies) =
+ try {
+ val expr = operand.accept(this)
+ val popped = ctx.popLocalRefScope()
+ (expr, popped.values.map(_.code).mkString("\n"))
+ } catch {
+ case t: Throwable =>
Review Comment:
when can a Throwable happen?
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/JsonFunctionsITCase.java:
##########
@@ -1540,6 +1540,166 @@ private static List<TestSetSpec> jsonArraySpec() {
STRING().notNull()));
}
+ /**
+ * Pins the local-ref / common-sub-expression handling for JSON
construction calls.
+ *
+ * <p>When two projections share a JSON-producing sub-expression, the
planner deduplicates them
+ * into a {@link org.apache.calcite.rex.RexLocalRef} that points at the
shared {@link
+ * org.apache.calcite.rex.RexCall}. The codegen helpers in {@code
JsonGenerateUtils} must
+ * dereference that local ref through the surrounding {@code RexProgram}
to recognize it as a
+ * JSON / JSON_OBJECT / JSON_ARRAY operand and embed the value as a raw
JSON node. Without that
+ * dereference the helpers see a plain {@code RexLocalRef}, fall back to
the string-quoting
+ * branch, and produce wrong output (e.g. {@code "{\"k\":\"[1,2,3]\"}"}
instead of {@code
+ * "{\"k\":[1,2,3]}"}).
+ *
+ * <p>The scenarios below cover each callsite — {@code JsonObjectCallGen},
{@code
+ * JsonArrayCallGen}, {@code JsonStringCallGen} — and each branch of the
inspection helpers
+ * ({@code JSON}, {@code JSON_OBJECT}, {@code JSON_ARRAY}).
+ */
+ private static List<TestSetSpec> jsonLocalRefReuseSpec() {
+ return List.of(
+ // Shared JSON(f) inside two JSON_OBJECT projections.
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.JSON_OBJECT,
+ "Shared JSON(f) sub-expression across
JSON_OBJECT projections")
+ .onFieldsWithData("[1,2,3]")
+ .andDataTypes(STRING())
+ .testResult(
+ resultSpec(
+ jsonObject(JsonOnNull.NULL, "k1",
json($("f0"))),
+ "JSON_OBJECT(KEY 'k1' VALUE JSON(f0))",
+ "{\"k1\":[1,2,3]}",
+ STRING().notNull(),
+ STRING().notNull()),
+ resultSpec(
+ jsonObject(JsonOnNull.NULL, "k2",
json($("f0"))),
+ "JSON_OBJECT(KEY 'k2' VALUE JSON(f0))",
+ "{\"k2\":[1,2,3]}",
+ STRING().notNull(),
+ STRING().notNull())),
+ // Shared JSON_ARRAY(...) inside two JSON_OBJECT projections.
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.JSON_OBJECT,
+ "Shared JSON_ARRAY sub-expression across
JSON_OBJECT projections")
+ .onFieldsWithData(1, 2, 3)
+ .andDataTypes(INT(), INT(), INT())
+ .testResult(
+ resultSpec(
+ jsonObject(
+ JsonOnNull.NULL,
+ "a",
+ jsonArray(
+ JsonOnNull.NULL,
+ $("f0"),
+ $("f1"),
+ $("f2"))),
+ "JSON_OBJECT(KEY 'a' VALUE
JSON_ARRAY(f0, f1, f2))",
+ "{\"a\":[1,2,3]}",
+ STRING().notNull(),
+ STRING().notNull()),
+ resultSpec(
+ jsonObject(
+ JsonOnNull.NULL,
+ "b",
+ jsonArray(
+ JsonOnNull.NULL,
+ $("f0"),
+ $("f1"),
+ $("f2"))),
+ "JSON_OBJECT(KEY 'b' VALUE
JSON_ARRAY(f0, f1, f2))",
+ "{\"b\":[1,2,3]}",
+ STRING().notNull(),
+ STRING().notNull())),
+ // Shared inner JSON_OBJECT inside two outer JSON_OBJECT
projections.
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.JSON_OBJECT,
+ "Shared inner JSON_OBJECT across outer
JSON_OBJECT projections")
+ .onFieldsWithData("V")
+ .andDataTypes(STRING())
+ .testResult(
+ resultSpec(
+ jsonObject(
+ JsonOnNull.NULL,
+ "outer1",
+ jsonObject(JsonOnNull.NULL,
"inner", $("f0"))),
+ "JSON_OBJECT(KEY 'outer1' VALUE
JSON_OBJECT(KEY 'inner' VALUE f0))",
+ "{\"outer1\":{\"inner\":\"V\"}}",
+ STRING().notNull(),
+ STRING().notNull()),
+ resultSpec(
+ jsonObject(
+ JsonOnNull.NULL,
+ "outer2",
+ jsonObject(JsonOnNull.NULL,
"inner", $("f0"))),
+ "JSON_OBJECT(KEY 'outer2' VALUE
JSON_OBJECT(KEY 'inner' VALUE f0))",
+ "{\"outer2\":{\"inner\":\"V\"}}",
+ STRING().notNull(),
+ STRING().notNull())),
+ // Shared JSON_OBJECT inside two JSON_ARRAY projections.
+ TestSetSpec.forFunction(
+ BuiltInFunctionDefinitions.JSON_ARRAY,
+ "Shared JSON_OBJECT inside JSON_ARRAY across
projections")
+ .onFieldsWithData("V")
+ .andDataTypes(STRING())
+ .testResult(
+ resultSpec(
+ jsonArray(
+ JsonOnNull.NULL,
+ jsonObject(JsonOnNull.NULL,
"k", $("f0"))),
+ "JSON_ARRAY(JSON_OBJECT(KEY 'k' VALUE
f0))",
+ "[{\"k\":\"V\"}]",
+ STRING().notNull(),
+ STRING().notNull()),
+ resultSpec(
+ jsonArray(
+ JsonOnNull.NULL,
+ jsonObject(JsonOnNull.NULL,
"k", $("f0"))),
+ "JSON_ARRAY(JSON_OBJECT(KEY 'k' VALUE
f0))",
+ "[{\"k\":\"V\"}]",
+ STRING().notNull(),
+ STRING().notNull())),
+ // Shared JSON(f) inside two JSON_ARRAY projections.
Review Comment:
> inside two JSON_ARRAY projections
I see only one projection. I guess you are assuming two due to internals of
the test base that combines Table API and SQL into one? Maybe it makes sense to
have a test that fully uses SQL to explicitly test two JSON_ARRAY projections
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala:
##########
@@ -37,7 +37,10 @@ import org.apache.calcite.rex.RexCall
* We remedy this by treating nested calls to this function differently and
inserting the value as a
* raw node instead of as a string node.
*/
-class JsonObjectCallGen(call: RexCall) extends CallGenerator {
+class JsonObjectCallGen(call: RexCall, rexProgram: RexProgram) extends
CallGenerator {
+
+ def this(call: RexCall) = this(call, null)
Review Comment:
remove
```suggestion
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CalcCodeGenerator.scala:
##########
@@ -181,26 +195,58 @@ object CalcCodeGenerator {
val filterInputCode = ctx.reuseInputUnboxingCode()
val filterInputSet = Set(ctx.reusableInputUnboxingExprs.keySet.toSeq:
_*)
+ val filterLocalRefSet: Set[Int] =
ctx.reusableLocalRefExprs.keySet.toSet
+
// if any filter conditions, projection code will enter an new scope
val projectionCode = produceProjectionCode
val projectionInputCode = ctx.reusableInputUnboxingExprs
- .filter(entry => !filterInputSet.contains(entry._1))
+ .filter { case (k, _) => !filterInputSet.contains(k) }
+ .values
+ .map(_.code)
+ .mkString("\n")
+
+ val filterLocalRefCode = ctx.reusableLocalRefExprs
+ .filter { case (k, _) => filterLocalRefSet.contains(k) }
.values
.map(_.code)
.mkString("\n")
+ val projectionLocalRefCode = ctx.reusableLocalRefExprs
+ .filter { case (k, _) => !filterLocalRefSet.contains(k) }
+ .values
+ .map(_.code)
+ .mkString("\n")
+
s"""
|${if (eagerInputUnboxingCode) filterInputCode else ""}
+ |$filterLocalRefCode
|${filterCondition.code}
|if (${filterCondition.resultTerm}) {
- | ${if (eagerInputUnboxingCode) projectionInputCode else ""}
+ | ${if (eagerInputUnboxingCode) projectionInputCode else ""}
+ | $projectionLocalRefCode
| $projectionCode
|}
|""".stripMargin
}
}
}
+ private def buildRexProgram(
+ classLoader: ClassLoader,
+ inputType: RowType,
+ projection: Seq[RexNode],
+ condition: Option[RexNode]): RexProgram = {
+ val typeFactory = new FlinkTypeFactory(classLoader,
FlinkTypeSystem.INSTANCE)
Review Comment:
can't we reference an existing factory already? creating it in place could
cause issues in the future.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BridgingSqlFunctionCallGen.scala:
##########
@@ -37,7 +37,9 @@ import java.util.Collections
* generator will be a reference to a [[WrappingCollector]]. Furthermore,
atomic types are wrapped
* into a row by the collector.
*/
-class BridgingSqlFunctionCallGen(call: RexCall) extends CallGenerator {
+class BridgingSqlFunctionCallGen(call: RexCall, rexProgram: RexProgram)
extends CallGenerator {
+
+ def this(call: RexCall) = this(call, null)
Review Comment:
remove?
```suggestion
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonArrayCallGen.scala:
##########
@@ -25,10 +25,13 @@ import
org.apache.flink.table.planner.codegen.JsonGenerateUtils.{createNodeTerm,
import org.apache.flink.table.runtime.functions.SqlJsonUtils
import org.apache.flink.table.types.logical.LogicalType
-import org.apache.calcite.rex.RexCall
+import org.apache.calcite.rex.{RexCall, RexProgram}
/** [[CallGenerator]] for `JSON_ARRAY`. */
-class JsonArrayCallGen(call: RexCall) extends CallGenerator {
+class JsonArrayCallGen(call: RexCall, rexProgram: RexProgram) extends
CallGenerator {
+
+ def this(call: RexCall) = this(call, null)
Review Comment:
remove
```suggestion
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala:
##########
@@ -462,21 +513,27 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext,
nullableInput: Boolean)
val resultType = FlinkTypeFactory.toLogicalType(call.getType)
// throw exception if json function is called outside JSON_OBJECT or
JSON_ARRAY function
- if (isJsonFunctionOperand(call)) {
+ if (isJsonFunctionOperand(call, if (rexProgram == null) null else
rexProgram.getExprList)) {
Review Comment:
I see `if (rexProgram == null) null else rexProgram.getExprList` occurs a
couple of times. Maybe move it to `CodeGenUtils` and reuse.
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala:
##########
@@ -462,21 +513,27 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext,
nullableInput: Boolean)
val resultType = FlinkTypeFactory.toLogicalType(call.getType)
// throw exception if json function is called outside JSON_OBJECT or
JSON_ARRAY function
- if (isJsonFunctionOperand(call)) {
+ if (isJsonFunctionOperand(call, if (rexProgram == null) null else
rexProgram.getExprList)) {
throw new ValidationException(
"The JSON() function is currently only supported inside JSON_ARRAY()
or as the VALUE param" +
" of JSON_OBJECT(). Example: JSON_OBJECT('a', JSON('{\"key\":
\"value\"}')) or " +
"JSON_ARRAY(JSON('{\"key\": \"value\"}')).")
}
if (call.getKind == SqlKind.SEARCH) {
- return generateSearch(
- ctx,
- generateExpression(call.getOperands.get(0)),
- call.getOperands.get(1).asInstanceOf[RexLiteral])
+ val sargLiteral =
+ if (rexProgram != null &&
call.getOperands.get(1).isInstanceOf[RexLocalRef]) {
+ rexProgram.getExprList
+ .get(call.getOperands.get(1).asInstanceOf[RexLocalRef].getIndex)
+ .asInstanceOf[RexLiteral]
+ } else {
+ call.getOperands.get(1).asInstanceOf[RexLiteral]
+ }
Review Comment:
put this into `generateSearch`
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala:
##########
@@ -487,15 +544,59 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext,
nullableInput: Boolean)
generateNullLiteral(resultType)
// We only support the JSON function inside of JSON_OBJECT or JSON_ARRAY
- case (operand: RexNode, i) if isSupportedJsonOperand(operand, call, i) =>
+ case (operand: RexNode, i)
+ if isSupportedJsonOperand(
+ operand,
+ call,
+ i,
+ if (rexProgram == null) null else rexProgram.getExprList) =>
generateJsonCall(operand)
+ case (o @ _, i) if condIdxs.contains(i) => visitOperandInScopedCache(o)
+
case (o @ _, _) => o.accept(this)
}
generateCallExpression(ctx, call, operands, resultType)
}
+ /**
+ * Indices of `call`'s operands that are NOT unconditionally evaluated at
runtime. Used to scope
+ * the RexLocalRef cache so that bodies cached while visiting these operands
are not hoisted out
+ * of the surrounding short-circuit / if-block.
+ *
+ * - `CASE(when_1, then_1, when_2, then_2, ..., else)`: only `when_1` is
unconditional.
+ * - `AND(a_0, a_1, ..., a_n)` / `OR(...)`: only `a_0` is unconditional;
subsequent operands are
+ * short-circuited by the operator semantics and the codegen.
+ */
+ private def conditionalOperandIndices(call: RexCall): Set[Int] =
call.getKind match {
+ case SqlKind.CASE | SqlKind.AND | SqlKind.OR | SqlKind.COALESCE =>
+ (1 until call.getOperands.size).toSet
+ case _ => Set.empty
+ }
+
+ private def visitOperandInScopedCache(operand: RexNode): GeneratedExpression
= {
+ ctx.pushLocalRefScope()
+ val (operandExpr, scopedBodies) =
+ try {
+ val expr = operand.accept(this)
+ val popped = ctx.popLocalRefScope()
+ (expr, popped.values.map(_.code).mkString("\n"))
+ } catch {
+ case t: Throwable =>
+ ctx.popLocalRefScope()
+ throw t
+ }
+ if (scopedBodies.isEmpty) operandExpr
Review Comment:
according to Flink guidelines we use `{}`
```suggestion
if (scopedBodies.isEmpty) {operandExpr
```
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##########
@@ -968,13 +969,390 @@ void testRawLiteralScalarFunction() throws Exception {
assertThat(TestCollectionTableFactory.getResult()).containsExactlyInAnyOrder(sinkData);
}
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("inputForTestCalcLocalRefReuse")
+ void testCalcLocalRefReuse(
+ String sql, List<Row> expectedRows, int expectedDetCalls, int
expectedNonDetCalls) {
+ final List<Row> sourceData = List.of(Row.of("Bob"), Row.of("Alice"));
+
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+ CountingUpperScalarFunction.COUNT.set(0);
+ NonDeterministicCountingScalarFunction.COUNT.set(0);
+
+ tEnv().createTemporarySystemFunction("Det",
CountingUpperScalarFunction.class);
+ tEnv().createTemporarySystemFunction(
+ "Nondet",
NonDeterministicCountingScalarFunction.class);
+ tEnv().executeSql("CREATE TABLE SourceTable (s STRING) WITH
('connector' = 'COLLECTION')");
+
+ final List<Row> actual =
CollectionUtil.iteratorToList(tEnv().executeSql(sql).collect());
+
+ assertThat(actual).containsExactlyElementsOf(expectedRows);
+ assertThat(CountingUpperScalarFunction.COUNT.get())
+ .as("Deterministic invocations")
+ .isEqualTo(expectedDetCalls);
+ assertThat(NonDeterministicCountingScalarFunction.COUNT.get())
+ .as("Non-deterministic invocations")
+ .isEqualTo(expectedNonDetCalls);
+ }
+
+ static Stream<Arguments> inputForTestCalcLocalRefReuse() {
+ return Stream.of(
+ Arguments.of(
+ "SELECT Det(s), Det(s), Det(s) FROM SourceTable",
+ List.of(Row.of("BOB", "BOB", "BOB"), Row.of("ALICE",
"ALICE", "ALICE")),
+ 2, // expected localref calls: rows × 1 (cached)
+ 0),
+ Arguments.of(
+ "SELECT Det(s), Det(s), UPPER(s) FROM SourceTable",
+ List.of(Row.of("BOB", "BOB", "BOB"), Row.of("ALICE",
"ALICE", "ALICE")),
+ 2, // rows × 1 (cached); built-in UPPER not counted
+ 0),
+ Arguments.of(
+ "SELECT Det(Det(s)), Det(Det(s)), Det(Det(s)) FROM
SourceTable",
+ List.of(Row.of("BOB", "BOB", "BOB"), Row.of("ALICE",
"ALICE", "ALICE")),
+ 4, // rows × 2 layers
+ 0),
+ Arguments.of(
+ "SELECT Nondet(s), Nondet(s), Nondet(s) FROM
SourceTable",
+ List.of(
+ Row.of("BOB_1", "BOB_2", "BOB_3"),
+ Row.of("ALICE_4", "ALICE_5", "ALICE_6")),
+ 0,
+ 6 // rows × 3 projections
+ ),
+ Arguments.of(
+ "SELECT Nondet(Det(s)), Nondet(Det(s)), Nondet(Det(s))
FROM SourceTable",
+ List.of(
+ Row.of("BOB_1", "BOB_2", "BOB_3"),
+ Row.of("ALICE_4", "ALICE_5", "ALICE_6")),
+ 2, // rows × 1 (inner cached)
+ 6 // rows × 3 projections
+ ),
+ Arguments.of(
+ "SELECT Det(Nondet(s)), Det(Nondet(s)), Det(Nondet(s))
FROM SourceTable",
+ List.of(
+ Row.of("BOB_1", "BOB_2", "BOB_3"),
+ Row.of("ALICE_4", "ALICE_5", "ALICE_6")),
+ 6, // rows × 3 (nondet input disables cache)
+ 6 // rows × 3 projections
+ ),
+ // shared Det in filter → cached once per row
+ Arguments.of(
+ "SELECT s FROM SourceTable"
+ + " WHERE Det(s) IS NOT NULL AND Det(s) <> ''
AND Det(s) <> ' '",
+ List.of(Row.of("Bob"), Row.of("Alice")),
+ 2,
+ 0),
+ // mixed UDF + built-in
+ Arguments.of(
+ "SELECT s FROM SourceTable"
+ + " WHERE Det(s) IS NOT NULL AND Det(s) <> ''
AND UPPER(s) <> ''",
+ List.of(Row.of("Bob"), Row.of("Alice")),
+ 2,
+ 0),
+ // nested Det in filter; both layers cached
+ Arguments.of(
+ "SELECT s FROM SourceTable"
+ + " WHERE Det(Det(s)) IS NOT NULL"
+ + " AND Det(Det(s)) <> '' AND Det(Det(s)) <> '
'",
+ List.of(Row.of("Bob"), Row.of("Alice")),
+ 4,
+ 0),
+ // non-deterministic in filter — never cached
+ Arguments.of(
+ "SELECT s FROM SourceTable"
+ + " WHERE Nondet(s) IS NOT NULL"
+ + " AND Nondet(s) <> '' AND Nondet(s) <> ' '",
+ List.of(Row.of("Bob"), Row.of("Alice")),
+ 0,
+ 6),
+ // outer nondet, inner Det cached
+ Arguments.of(
+ "SELECT s FROM SourceTable"
+ + " WHERE Nondet(Det(s)) IS NOT NULL"
+ + " AND Nondet(Det(s)) <> '' AND
Nondet(Det(s)) <> ' '",
+ List.of(Row.of("Bob"), Row.of("Alice")),
+ 2,
+ 6),
+ // Det with nondet input → cache bypassed
+ Arguments.of(
+ "SELECT s FROM SourceTable"
+ + " WHERE Det(Nondet(s)) IS NOT NULL"
+ + " AND Det(Nondet(s)) <> '' AND
Det(Nondet(s)) <> ' '",
+ List.of(Row.of("Bob"), Row.of("Alice")),
+ 6,
+ 6),
+ // filter ↔ projection share via unified program
+ Arguments.of(
+ "SELECT Det(s) FROM SourceTable WHERE Det(s) = 'BOB'",
+ List.of(Row.of("BOB")),
+ 2,
+ 0),
+ Arguments.of(
+ "SELECT Det(s), Det(s) FROM SourceTable WHERE Det(s) =
'BOB'",
+ List.of(Row.of("BOB", "BOB")),
+ 2,
+ 0),
+
+ //
---------------------------------------------------------------------------
+ // JSON construction scenarios. These verify that the localref
/ RexProgram CSE
+ // cache also fires when the shared sub-expression is wrapped
inside (or itself
+ // is) a JSON_OBJECT / JSON_ARRAY / JSON_STRING call.
+ //
---------------------------------------------------------------------------
+
+ // JSON_OBJECT × 2 sharing inner Det → cached once per row.
+ Arguments.of(
+ "SELECT JSON_OBJECT(KEY 'a' VALUE Det(s)),"
+ + " JSON_OBJECT(KEY 'b' VALUE Det(s))"
+ + " FROM SourceTable",
+ List.of(
+ Row.of("{\"a\":\"BOB\"}", "{\"b\":\"BOB\"}"),
+ Row.of("{\"a\":\"ALICE\"}",
"{\"b\":\"ALICE\"}")),
+ 2, // rows × 1 (cached)
+ 0),
+ // JSON_ARRAY × 2 sharing inner Det → cached.
+ Arguments.of(
+ "SELECT JSON_ARRAY(Det(s)), JSON_ARRAY(Det(s)) FROM
SourceTable",
+ List.of(
+ Row.of("[\"BOB\"]", "[\"BOB\"]"),
+ Row.of("[\"ALICE\"]", "[\"ALICE\"]")),
+ 2,
+ 0),
+ // JSON_STRING × 2 sharing inner Det → cached.
+ Arguments.of(
+ "SELECT JSON_STRING(Det(s)), JSON_STRING(Det(s)) FROM
SourceTable",
+ List.of(Row.of("\"BOB\"", "\"BOB\""),
Row.of("\"ALICE\"", "\"ALICE\"")),
+ 2,
+ 0),
+ // Mixed JSON_OBJECT + JSON_ARRAY sharing same Det.
+ Arguments.of(
+ "SELECT JSON_OBJECT(KEY 'k' VALUE Det(s)),
JSON_ARRAY(Det(s))"
+ + " FROM SourceTable",
+ List.of(
+ Row.of("{\"k\":\"BOB\"}", "[\"BOB\"]"),
+ Row.of("{\"k\":\"ALICE\"}", "[\"ALICE\"]")),
+ 2,
+ 0),
+ // Mixed JSON_OBJECT + JSON_STRING sharing same Det.
+ Arguments.of(
+ "SELECT JSON_OBJECT(KEY 'k' VALUE Det(s)),
JSON_STRING(Det(s))"
+ + " FROM SourceTable",
+ List.of(
+ Row.of("{\"k\":\"BOB\"}", "\"BOB\""),
+ Row.of("{\"k\":\"ALICE\"}", "\"ALICE\"")),
+ 2,
+ 0),
+ // JSON_OBJECT × 3 sharing same Det → cached across all 3
sites.
+ Arguments.of(
+ "SELECT JSON_OBJECT(KEY 'a' VALUE Det(s)),"
+ + " JSON_OBJECT(KEY 'b' VALUE Det(s)),"
+ + " JSON_OBJECT(KEY 'c' VALUE Det(s))"
+ + " FROM SourceTable",
+ List.of(
+ Row.of("{\"a\":\"BOB\"}", "{\"b\":\"BOB\"}",
"{\"c\":\"BOB\"}"),
+ Row.of(
+ "{\"a\":\"ALICE\"}",
+ "{\"b\":\"ALICE\"}",
+ "{\"c\":\"ALICE\"}")),
+ 2,
+ 0),
+ // Nested Det(Det(s)) inside two JSON_OBJECT projections →
both layers cached.
+ Arguments.of(
+ "SELECT JSON_OBJECT(KEY 'a' VALUE Det(Det(s))),"
+ + " JSON_OBJECT(KEY 'b' VALUE Det(Det(s)))"
+ + " FROM SourceTable",
+ List.of(
+ Row.of("{\"a\":\"BOB\"}", "{\"b\":\"BOB\"}"),
+ Row.of("{\"a\":\"ALICE\"}",
"{\"b\":\"ALICE\"}")),
+ 4, // rows × 2 layers
+ 0),
+ // Nondet inside two JSON_OBJECT projections → never cached.
+ Arguments.of(
+ "SELECT JSON_OBJECT(KEY 'a' VALUE Nondet(s)),"
+ + " JSON_OBJECT(KEY 'b' VALUE Nondet(s))"
+ + " FROM SourceTable",
+ List.of(
+ Row.of("{\"a\":\"BOB_1\"}",
"{\"b\":\"BOB_2\"}"),
+ Row.of("{\"a\":\"ALICE_3\"}",
"{\"b\":\"ALICE_4\"}")),
+ 0,
+ 4 // rows × 2 projections
+ ),
+ // Outer Nondet, inner Det inside two JSON_OBJECT projections
— Det cached.
+ Arguments.of(
+ "SELECT JSON_OBJECT(KEY 'a' VALUE Nondet(Det(s))),"
+ + " JSON_OBJECT(KEY 'b' VALUE Nondet(Det(s)))"
+ + " FROM SourceTable",
+ List.of(
+ Row.of("{\"a\":\"BOB_1\"}",
"{\"b\":\"BOB_2\"}"),
+ Row.of("{\"a\":\"ALICE_3\"}",
"{\"b\":\"ALICE_4\"}")),
+ 2, // inner Det cached
+ 4),
+ // Outer Det, inner Nondet → outer cache disabled by nondet
operand.
+ Arguments.of(
+ "SELECT JSON_OBJECT(KEY 'a' VALUE Det(Nondet(s))),"
+ + " JSON_OBJECT(KEY 'b' VALUE Det(Nondet(s)))"
+ + " FROM SourceTable",
+ List.of(
+ Row.of("{\"a\":\"BOB_1\"}",
"{\"b\":\"BOB_2\"}"),
+ Row.of("{\"a\":\"ALICE_3\"}",
"{\"b\":\"ALICE_4\"}")),
+ 4, // outer Det not cached (nondet operand)
+ 4),
+ // Filter ↔ JSON projection share Det via unified program.
+ Arguments.of(
+ "SELECT JSON_OBJECT(KEY 'k' VALUE Det(s))"
+ + " FROM SourceTable WHERE Det(s) = 'BOB'",
+ List.of(Row.of("{\"k\":\"BOB\"}")),
+ 2,
+ 0),
+ // Shared inner JSON_OBJECT(KEY 'k' VALUE Det(s)) inside two
outer JSON_OBJECT
+ // projections — verifies CSE works when the cached node is
itself a JSON
+ // construction call (and validates the JSON helpers'
RexLocalRef deref path
+ // along the way).
+ Arguments.of(
+ "SELECT JSON_OBJECT(KEY 'outer1' VALUE JSON_OBJECT(KEY
'k' VALUE Det(s))),"
+ + " JSON_OBJECT(KEY 'outer2' VALUE
JSON_OBJECT(KEY 'k' VALUE Det(s)))"
+ + " FROM SourceTable",
+ List.of(
+ Row.of(
+ "{\"outer1\":{\"k\":\"BOB\"}}",
+ "{\"outer2\":{\"k\":\"BOB\"}}"),
+ Row.of(
+ "{\"outer1\":{\"k\":\"ALICE\"}}",
+ "{\"outer2\":{\"k\":\"ALICE\"}}")),
+ 2,
+ 0),
+ // Shared inner JSON_ARRAY(Det(s)) inside two outer
JSON_OBJECT projections.
+ Arguments.of(
+ "SELECT JSON_OBJECT(KEY 'a' VALUE JSON_ARRAY(Det(s))),"
+ + " JSON_OBJECT(KEY 'b' VALUE
JSON_ARRAY(Det(s)))"
+ + " FROM SourceTable",
+ List.of(
+ Row.of("{\"a\":[\"BOB\"]}",
"{\"b\":[\"BOB\"]}"),
+ Row.of("{\"a\":[\"ALICE\"]}",
"{\"b\":[\"ALICE\"]}")),
+ 2,
+ 0),
+ // Shared inner JSON_OBJECT(KEY 'k' VALUE Det(s)) inside two
JSON_ARRAY
+ // projections.
+ Arguments.of(
+ "SELECT JSON_ARRAY(JSON_OBJECT(KEY 'k' VALUE Det(s))),"
+ + " JSON_ARRAY(JSON_OBJECT(KEY 'k' VALUE
Det(s)))"
+ + " FROM SourceTable",
+ List.of(
+ Row.of("[{\"k\":\"BOB\"}]",
"[{\"k\":\"BOB\"}]"),
+ Row.of("[{\"k\":\"ALICE\"}]",
"[{\"k\":\"ALICE\"}]")),
+ 2,
+ 0));
+ }
+
+ @Test
+ void testLocalRefReuseForMixedArgs() {
+ final List<Row> sourceData = List.of(Row.of("Bob"), Row.of("Alice"));
+ final int callSites = 2;
+
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+ CountingUpperScalarFunction.COUNT.set(0);
+ NonDeterministicCountingScalarFunction.COUNT.set(0);
+ CountingConcat3ScalarFunction.COUNT.set(0);
+
+ tEnv().createTemporarySystemFunction("Det",
CountingUpperScalarFunction.class);
+ tEnv().createTemporarySystemFunction(
+ "Nondet",
NonDeterministicCountingScalarFunction.class);
+ tEnv().createTemporarySystemFunction("Concat3",
CountingConcat3ScalarFunction.class);
+ tEnv().executeSql("CREATE TABLE SourceTable (s STRING) WITH
('connector' = 'COLLECTION')");
+
+ final List<Row> actual =
+ CollectionUtil.iteratorToList(
+ tEnv().executeSql(
+ "SELECT Concat3(Det(s), Nondet(s),
Det(s)),"
+ + " Concat3(Det(s), Nondet(s),
Det(s))"
+ + " FROM SourceTable")
+ .collect());
+
+ assertThat(actual)
+ .containsExactly(
+ Row.of("BOB/BOB_1/BOB", "BOB/BOB_2/BOB"),
+ Row.of("ALICE/ALICE_3/ALICE", "ALICE/ALICE_4/ALICE"));
+
+
assertThat(CountingUpperScalarFunction.COUNT.get()).isEqualTo(sourceData.size());
+ assertThat(NonDeterministicCountingScalarFunction.COUNT.get())
+ .isEqualTo(sourceData.size() * callSites);
+ // Concat3 is deterministic however has non-deterministic input
+ assertThat(CountingConcat3ScalarFunction.COUNT.get())
+ .isEqualTo(sourceData.size() * callSites);
+ }
+
+ @Test
+ void testCalcSharesSubExpressionBetweenFilterAndProjection() {
+ final List<Row> sourceData =
+ List.of(Row.of("Bob"), Row.of("Bob"), Row.of("Alice"),
Row.of("Alice"));
+
+ TestCollectionTableFactory.reset();
+ TestCollectionTableFactory.initData(sourceData);
+ CountingUpperScalarFunction.COUNT.set(0);
+
+ tEnv().createTemporarySystemFunction("CountingUpper",
CountingUpperScalarFunction.class);
+ tEnv().executeSql("CREATE TABLE SourceTable (s STRING) WITH
('connector' = 'COLLECTION')");
+
+ final List<Row> actual =
+ CollectionUtil.iteratorToList(
+ tEnv().executeSql(
+ "SELECT CountingUpper(s) FROM
SourceTable"
+ + " WHERE CountingUpper(s) =
'BOB' AND CountingUpper(s) <> 'BOB2'")
+ .collect());
+
+ assertThat(actual).containsExactly(Row.of("BOB"), Row.of("BOB"));
+
+ // Filter and projection share via the unified RexProgram, so the UDF
runs once per
+ // source row regardless of how many call sites name it.
+
assertThat(CountingUpperScalarFunction.COUNT.get()).isEqualTo(sourceData.size());
+ }
+
+ /**
+ * Pins the CASE-WHEN guard interaction with the RexLocalRef cache.
+ *
+ * <p>Prior to scoped caching, RexProgramBuilder collapsed the division
{@code a / b} into a
Review Comment:
> Prior to scoped caching,
Please instruct AI to not use temporal references. Something like:
```
- Add comments that are useful for long-term maintenance of the code base.
Avoid obvious JavaDocs, obvious parameter or return type descriptions. Don't
include short-term comments that have been directly derived from the prompt
such as "now uses" or "now works differently" or historical references like
"previously", those rather distract a reader who is new to the code base.
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala:
##########
@@ -1075,4 +1113,15 @@ class CodeGeneratorContext(
fieldTerm
}
+
+ def pushLocalRefScope(): Unit = {
Review Comment:
add doc to all these methods. maybe group all localref methods together in
this class in a dedicated comment section
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala:
##########
@@ -177,69 +178,50 @@ object JsonGenerateUtils {
}
}
- /** Determines whether the given operand is a call to a JSON_OBJECT */
- def isJsonObjectOperand(operand: RexNode): Boolean = {
- operand match {
- case rexCall: RexCall =>
- rexCall.getOperator match {
- case JSON_OBJECT => true
- case _ => false
- }
- case _ => false
- }
- }
+ /** Determines whether the given operand is a call to a JSON_OBJECT. */
+ def isJsonObjectOperand(operand: RexNode, exprs: java.util.List[RexNode]):
Boolean =
Review Comment:
Can we update all operands that have been added in this PR? `exprs` is a
very generic term, lot's of variables are call this way in the code gen stack.
```suggestion
def isJsonObjectOperand(operand: RexNode, localRefs:
java.util.List[RexNode]): Boolean =
```
##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/CodeGeneratorContext.scala:
##########
@@ -116,6 +115,24 @@ class CodeGeneratorContext(
val reusableInputUnboxingExprs: mutable.Map[(String, Int),
GeneratedExpression] =
mutable.Map[(String, Int), GeneratedExpression]()
+ // map of expressions for shared RexProgram exprList entries that will be
added only once
+ // exprList index -> expr
+ val reusableLocalRefExprs: mutable.LinkedHashMap[Int, GeneratedExpression] =
+ mutable.LinkedHashMap[Int, GeneratedExpression]()
+
+ // Stack of RexLocalRef cache scopes. The bottom scope IS
reusableLocalRefExprs and is read
Review Comment:
could you give an example of a "scope"? Maybe rework this comment for future
mantainability?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]