snuyanzin commented on code in PR #28113:
URL: https://github.com/apache/flink/pull/28113#discussion_r3187935874


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala:
##########
@@ -487,15 +544,59 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, 
nullableInput: Boolean)
         generateNullLiteral(resultType)
 
       // We only support the JSON function inside of JSON_OBJECT or JSON_ARRAY
-      case (operand: RexNode, i) if isSupportedJsonOperand(operand, call, i) =>
+      case (operand: RexNode, i)
+          if isSupportedJsonOperand(
+            operand,
+            call,
+            i,
+            if (rexProgram == null) null else rexProgram.getExprList) =>
         generateJsonCall(operand)
 
+      case (o @ _, i) if condIdxs.contains(i) => visitOperandInScopedCache(o)
+
       case (o @ _, _) => o.accept(this)
     }
 
     generateCallExpression(ctx, call, operands, resultType)
   }
 
+  /**
+   * Indices of `call`'s operands that are NOT unconditionally evaluated at 
runtime. Used to scope
+   * the RexLocalRef cache so that bodies cached while visiting these operands 
are not hoisted out
+   * of the surrounding short-circuit / if-block.
+   *
+   *   - `CASE(when_1, then_1, when_2, then_2, ..., else)`: only `when_1` is 
unconditional.
+   *   - `AND(a_0, a_1, ..., a_n)` / `OR(...)`: only `a_0` is unconditional; 
subsequent operands are
+   *     short-circuited by the operator semantics and the codegen.
+   */
+  private def conditionalOperandIndices(call: RexCall): Set[Int] = 
call.getKind match {
+    case SqlKind.CASE | SqlKind.AND | SqlKind.OR | SqlKind.COALESCE =>
+      (1 until call.getOperands.size).toSet
+    case _ => Set.empty
+  }
+
+  private def visitOperandInScopedCache(operand: RexNode): GeneratedExpression 
= {
+    ctx.pushLocalRefScope()
+    val (operandExpr, scopedBodies) =
+      try {
+        val expr = operand.accept(this)
+        val popped = ctx.popLocalRefScope()
+        (expr, popped.values.map(_.code).mkString("\n"))
+      } catch {
+        case t: Throwable =>
+          ctx.popLocalRefScope()
+          throw t
+      }
+    if (scopedBodies.isEmpty) operandExpr

Review Comment:
   done



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala:
##########
@@ -177,69 +178,50 @@ object JsonGenerateUtils {
     }
   }
 
-  /** Determines whether the given operand is a call to a JSON_OBJECT */
-  def isJsonObjectOperand(operand: RexNode): Boolean = {
-    operand match {
-      case rexCall: RexCall =>
-        rexCall.getOperator match {
-          case JSON_OBJECT => true
-          case _ => false
-        }
-      case _ => false
-    }
-  }
+  /** Determines whether the given operand is a call to a JSON_OBJECT. */
+  def isJsonObjectOperand(operand: RexNode, exprs: java.util.List[RexNode]): 
Boolean =

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to