This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ab7c38aa223eb89e818c3a1ca2a1bcea1dffa137
Author: Ingo Bürk <ingo.bu...@tngtech.com>
AuthorDate: Thu Oct 14 10:14:48 2021 +0200

    [FLINK-24387][table-planner] Remove code generation dependency on actual 
JSON node
---
 .../table/planner/codegen/JsonGenerateUtils.scala  | 64 ++++++++++------------
 .../planner/codegen/calls/JsonArrayCallGen.scala   |  2 +-
 .../planner/codegen/calls/JsonObjectCallGen.scala  |  3 +-
 .../table/runtime/functions/SqlJsonUtils.java      |  6 ++
 4 files changed, 38 insertions(+), 37 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
index f59aad1..5cb49eb 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala
@@ -22,12 +22,13 @@ import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, 
newName, rowFieldReadAccess, typeTerm}
 import 
org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.{JSON_ARRAY, 
JSON_OBJECT}
 import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala
+import org.apache.flink.table.runtime.functions.SqlJsonUtils
 import 
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isCharacterString
 import org.apache.flink.table.types.logical.LogicalTypeRoot._
-import org.apache.flink.table.types.logical.{ArrayType, LogicalType, MapType, 
MultisetType, RowType}
+import org.apache.flink.table.types.logical._
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode
-import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ArrayNode,
 ContainerNode, ObjectNode}
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ArrayNode,
 ObjectNode}
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawValue
 
 import org.apache.calcite.rex.{RexCall, RexNode}
@@ -38,17 +39,18 @@ import java.time.format.DateTimeFormatter
 /** Utility for generating JSON function calls. */
 object JsonGenerateUtils {
 
+  private def jsonUtils = className[SqlJsonUtils]
+
   /** Returns a term which wraps the given `expression` into a [[JsonNode]]. 
If the operand
    * represents another JSON construction function, a raw node is used 
instead. */
   def createNodeTerm(
       ctx: CodeGeneratorContext,
-      containerNodeTerm: String,
       expression: GeneratedExpression,
       operand: RexNode): String = {
     if (isJsonFunctionOperand(operand)) {
-      createRawNodeTerm(containerNodeTerm, expression)
+      createRawNodeTerm(expression)
     } else {
-      createNodeTerm(ctx, containerNodeTerm, expression)
+      createNodeTerm(ctx, expression)
     }
   }
 
@@ -57,9 +59,8 @@ object JsonGenerateUtils {
    */
   def createNodeTerm(
       ctx: CodeGeneratorContext,
-      containerNodeTerm: String,
       valueExpr: GeneratedExpression): String = {
-    createNodeTerm(ctx, containerNodeTerm, valueExpr.resultTerm, 
valueExpr.resultType)
+    createNodeTerm(ctx, valueExpr.resultTerm, valueExpr.resultType)
   }
 
   /**
@@ -67,44 +68,43 @@ object JsonGenerateUtils {
    */
   private def createNodeTerm(
       ctx: CodeGeneratorContext,
-      containerNodeTerm: String,
       term: String,
       logicalType: LogicalType): String = {
+    val nodeFactoryTerm = s"$jsonUtils.getNodeFactory()"
+
     logicalType.getTypeRoot match {
-      case CHAR | VARCHAR => s"$containerNodeTerm.textNode($term.toString())"
-      case BOOLEAN => s"$containerNodeTerm.booleanNode($term)"
-      case DECIMAL => s"$containerNodeTerm.numberNode($term.toBigDecimal())"
+      case CHAR | VARCHAR => s"$nodeFactoryTerm.textNode($term.toString())"
+      case BOOLEAN => s"$nodeFactoryTerm.booleanNode($term)"
+      case DECIMAL => s"$nodeFactoryTerm.numberNode($term.toBigDecimal())"
       case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE =>
-        s"$containerNodeTerm.numberNode($term)"
+        s"$nodeFactoryTerm.numberNode($term)"
       case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
         val formatter = 
s"${typeTerm(classOf[DateTimeFormatter])}.ISO_LOCAL_DATE_TIME"
         val isoTerm = s"$term.toLocalDateTime().format($formatter)"
         logicalType.getTypeRoot match {
-          case TIMESTAMP_WITHOUT_TIME_ZONE => 
s"$containerNodeTerm.textNode($isoTerm)"
-          case TIMESTAMP_WITH_LOCAL_TIME_ZONE => 
s"""$containerNodeTerm.textNode($isoTerm + "Z")"""
+          case TIMESTAMP_WITHOUT_TIME_ZONE => 
s"$nodeFactoryTerm.textNode($isoTerm)"
+          case TIMESTAMP_WITH_LOCAL_TIME_ZONE => 
s"""$nodeFactoryTerm.textNode($isoTerm + "Z")"""
         }
       case TIMESTAMP_WITH_TIME_ZONE =>
         throw new CodeGenException(s"'TIMESTAMP WITH TIME ZONE' is not yet 
supported.")
       case BINARY | VARBINARY =>
-        s"$containerNodeTerm.binaryNode($term)"
+        s"$nodeFactoryTerm.binaryNode($term)"
       case ARRAY =>
-        val converterName = generateArrayConverter(ctx, containerNodeTerm,
+        val converterName = generateArrayConverter(ctx,
           logicalType.asInstanceOf[ArrayType].getElementType)
 
         s"$converterName($term)"
       case ROW =>
-        val converterName = generateRowConverter(ctx, containerNodeTerm,
-          logicalType.asInstanceOf[RowType])
+        val converterName = generateRowConverter(ctx, 
logicalType.asInstanceOf[RowType])
 
         s"$converterName($term)"
       case MAP =>
         val mapType = logicalType.asInstanceOf[MapType]
-        val converterName = generateMapConverter(ctx, containerNodeTerm, 
mapType.getKeyType,
-          mapType.getValueType)
+        val converterName = generateMapConverter(ctx, mapType.getKeyType, 
mapType.getValueType)
 
         s"$converterName($term)"
       case MULTISET =>
-        val converterName = generateMapConverter(ctx, containerNodeTerm,
+        val converterName = generateMapConverter(ctx,
           logicalType.asInstanceOf[MultisetType].getElementType, 
DataTypes.INT().getLogicalType)
 
         s"$converterName($term)"
@@ -116,13 +116,12 @@ object JsonGenerateUtils {
   /**
    * Returns a term which wraps the given `valueExpr` as a raw [[JsonNode]].
    *
-   * @param containerNodeTerm Name of the [[ContainerNode]] from which to 
create the raw node.
    * @param valueExpr Generated expression of the value which should be 
wrapped.
    * @return Generate code fragment creating the raw node.
    */
-  def createRawNodeTerm(containerNodeTerm: String, valueExpr: 
GeneratedExpression): String = {
+  private def createRawNodeTerm(valueExpr: GeneratedExpression): String = {
     s"""
-       |$containerNodeTerm.rawValueNode(
+       |$jsonUtils.getNodeFactory().rawValueNode(
        |    new 
${typeTerm(classOf[RawValue])}(${valueExpr.resultTerm}.toString()))
        |""".stripMargin
   }
@@ -153,7 +152,6 @@ object JsonGenerateUtils {
   /** Generates a method to convert arrays into [[ArrayNode]]. */
   private def generateArrayConverter(
       ctx: CodeGeneratorContext,
-      containerNodeTerm: String,
       elementType: LogicalType): String = {
     val fieldAccessCode = toExternalTypeTerm(
       rowFieldReadAccess(ctx, "i", "arrData", elementType), elementType)
@@ -162,10 +160,9 @@ object JsonGenerateUtils {
     val methodCode =
       s"""
          |private ${className[ArrayNode]} 
$methodName(${CodeGenUtils.ARRAY_DATA} arrData) {
-         |    ${className[ArrayNode]} arrNode = $containerNodeTerm.arrayNode();
+         |    ${className[ArrayNode]} arrNode = 
$jsonUtils.getNodeFactory().arrayNode();
          |    for (int i = 0; i < arrData.size(); i++) {
-         |        arrNode.add(
-         |            ${createNodeTerm(ctx, containerNodeTerm, 
fieldAccessCode, elementType)});
+         |        arrNode.add(${createNodeTerm(ctx, fieldAccessCode, 
elementType)});
          |    }
          |
          |    return arrNode;
@@ -179,7 +176,6 @@ object JsonGenerateUtils {
   /** Generates a method to convert rows into [[ObjectNode]]. */
   private def generateRowConverter(
       ctx: CodeGeneratorContext,
-      containerNodeTerm: String,
       rowType: RowType): String = {
 
     val populateObjectCode = toScala(rowType.getFieldNames).zipWithIndex.map {
@@ -190,7 +186,7 @@ object JsonGenerateUtils {
 
         s"""
            |objNode.set("$fieldName",
-           |    ${createNodeTerm(ctx, containerNodeTerm, fieldAccessCode, 
fieldType)});
+           |    ${createNodeTerm(ctx, fieldAccessCode, fieldType)});
            |""".stripMargin
     }.mkString
 
@@ -198,7 +194,7 @@ object JsonGenerateUtils {
     val methodCode =
       s"""
          |private ${className[ObjectNode]} 
$methodName(${CodeGenUtils.ROW_DATA} rowData) {
-         |    ${className[ObjectNode]} objNode = 
$containerNodeTerm.objectNode();
+         |    ${className[ObjectNode]} objNode = 
$jsonUtils.getNodeFactory().objectNode();
          |    $populateObjectCode
          |
          |    return objNode;
@@ -212,7 +208,6 @@ object JsonGenerateUtils {
   /** Generates a method to convert maps into [[ObjectNode]]. */
   private def generateMapConverter(
       ctx: CodeGeneratorContext,
-      containerNodeTerm: String,
       keyType: LogicalType,
       valueType: LogicalType): String = {
     if (!isCharacterString(keyType)) {
@@ -230,7 +225,7 @@ object JsonGenerateUtils {
     val methodCode =
       s"""
          |private ${className[ObjectNode]} 
$methodName(${CodeGenUtils.MAP_DATA} mapData) {
-         |    ${className[ObjectNode]} objNode = 
$containerNodeTerm.objectNode();
+         |    ${className[ObjectNode]} objNode = 
$jsonUtils.getNodeFactory().objectNode();
          |    for (int i = 0; i < mapData.size(); i++) {
          |        java.lang.String key = $keyAccessCode;
          |        if (key == null) {
@@ -238,8 +233,7 @@ object JsonGenerateUtils {
          |                + " was null. This is not supported during 
conversion to JSON.");
          |        }
          |
-         |        objNode.set(key,
-         |            ${createNodeTerm(ctx, containerNodeTerm, 
valueAccessCode, valueType)});
+         |        objNode.set(key, ${createNodeTerm(ctx, valueAccessCode, 
valueType)});
          |    }
          |
          |    return objNode;
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonArrayCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonArrayCallGen.scala
index f5275e9..60b05ed 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonArrayCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonArrayCallGen.scala
@@ -47,7 +47,7 @@ class JsonArrayCallGen(call: RexCall) extends CallGenerator {
     val onNull = getOnNullBehavior(operands.head)
     val populateNodeCode = operands.zipWithIndex.drop(1).map {
       case (elementExpr, elementIdx) =>
-        val elementTerm = createNodeTerm(ctx, nodeTerm, elementExpr, 
call.operands.get(elementIdx))
+        val elementTerm = createNodeTerm(ctx, elementExpr, 
call.operands.get(elementIdx))
 
         onNull match {
           case NULL_ON_NULL =>
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala
index f946a6d..967089a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala
@@ -17,6 +17,7 @@
  */
 
 package org.apache.flink.table.planner.codegen.calls
+
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
 import 
org.apache.flink.table.planner.codegen.JsonGenerateUtils.{createNodeTerm, 
getOnNullBehavior}
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
GeneratedExpression}
@@ -56,7 +57,7 @@ class JsonObjectCallGen(call: RexCall) extends CallGenerator {
     val onNull = getOnNullBehavior(operands.head)
     val populateNodeCode = operands.zipWithIndex.drop(1).grouped(2).map {
       case Seq((keyExpr, _), (valueExpr, valueIdx)) =>
-        val valueTerm = createNodeTerm(ctx, nodeTerm, valueExpr, 
call.operands.get(valueIdx))
+        val valueTerm = createNodeTerm(ctx, valueExpr, 
call.operands.get(valueIdx))
 
         onNull match {
           case NULL_ON_NULL =>
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
index b942486..92e2583 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessin
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
 /**
@@ -39,6 +40,11 @@ public class SqlJsonUtils {
     private static final JsonFactory JSON_FACTORY = new JsonFactory();
     private static final ObjectMapper MAPPER = new ObjectMapper(JSON_FACTORY);
 
+    /** Returns the {@link JsonNodeFactory} for creating nodes. */
+    public static JsonNodeFactory getNodeFactory() {
+        return MAPPER.getNodeFactory();
+    }
+
     /** Returns a new {@link ObjectNode}. */
     public static ObjectNode createObjectNode() {
         return MAPPER.createObjectNode();

Reply via email to