This is an automated email from the ASF dual-hosted git repository. twalthr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit ab7c38aa223eb89e818c3a1ca2a1bcea1dffa137 Author: Ingo Bürk <ingo.bu...@tngtech.com> AuthorDate: Thu Oct 14 10:14:48 2021 +0200 [FLINK-24387][table-planner] Remove code generation dependency on actual JSON node --- .../table/planner/codegen/JsonGenerateUtils.scala | 64 ++++++++++------------ .../planner/codegen/calls/JsonArrayCallGen.scala | 2 +- .../planner/codegen/calls/JsonObjectCallGen.scala | 3 +- .../table/runtime/functions/SqlJsonUtils.java | 6 ++ 4 files changed, 38 insertions(+), 37 deletions(-) diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala index f59aad1..5cb49eb 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/JsonGenerateUtils.scala @@ -22,12 +22,13 @@ import org.apache.flink.table.api.DataTypes import org.apache.flink.table.planner.codegen.CodeGenUtils.{className, newName, rowFieldReadAccess, typeTerm} import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable.{JSON_ARRAY, JSON_OBJECT} import org.apache.flink.table.planner.utils.JavaScalaConversionUtil.toScala +import org.apache.flink.table.runtime.functions.SqlJsonUtils import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isCharacterString import org.apache.flink.table.types.logical.LogicalTypeRoot._ -import org.apache.flink.table.types.logical.{ArrayType, LogicalType, MapType, MultisetType, RowType} +import org.apache.flink.table.types.logical._ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode -import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ArrayNode, ContainerNode, ObjectNode} +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.{ArrayNode, ObjectNode} import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.RawValue import org.apache.calcite.rex.{RexCall, RexNode} @@ -38,17 +39,18 @@ import java.time.format.DateTimeFormatter /** Utility for generating JSON function calls. */ object JsonGenerateUtils { + private def jsonUtils = className[SqlJsonUtils] + /** Returns a term which wraps the given `expression` into a [[JsonNode]]. If the operand * represents another JSON construction function, a raw node is used instead. */ def createNodeTerm( ctx: CodeGeneratorContext, - containerNodeTerm: String, expression: GeneratedExpression, operand: RexNode): String = { if (isJsonFunctionOperand(operand)) { - createRawNodeTerm(containerNodeTerm, expression) + createRawNodeTerm(expression) } else { - createNodeTerm(ctx, containerNodeTerm, expression) + createNodeTerm(ctx, expression) } } @@ -57,9 +59,8 @@ object JsonGenerateUtils { */ def createNodeTerm( ctx: CodeGeneratorContext, - containerNodeTerm: String, valueExpr: GeneratedExpression): String = { - createNodeTerm(ctx, containerNodeTerm, valueExpr.resultTerm, valueExpr.resultType) + createNodeTerm(ctx, valueExpr.resultTerm, valueExpr.resultType) } /** @@ -67,44 +68,43 @@ object JsonGenerateUtils { */ private def createNodeTerm( ctx: CodeGeneratorContext, - containerNodeTerm: String, term: String, logicalType: LogicalType): String = { + val nodeFactoryTerm = s"$jsonUtils.getNodeFactory()" + logicalType.getTypeRoot match { - case CHAR | VARCHAR => s"$containerNodeTerm.textNode($term.toString())" - case BOOLEAN => s"$containerNodeTerm.booleanNode($term)" - case DECIMAL => s"$containerNodeTerm.numberNode($term.toBigDecimal())" + case CHAR | VARCHAR => s"$nodeFactoryTerm.textNode($term.toString())" + case BOOLEAN => s"$nodeFactoryTerm.booleanNode($term)" + case DECIMAL => s"$nodeFactoryTerm.numberNode($term.toBigDecimal())" case TINYINT | SMALLINT | INTEGER | BIGINT | FLOAT | DOUBLE => - s"$containerNodeTerm.numberNode($term)" + s"$nodeFactoryTerm.numberNode($term)" case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => val formatter = s"${typeTerm(classOf[DateTimeFormatter])}.ISO_LOCAL_DATE_TIME" val isoTerm = s"$term.toLocalDateTime().format($formatter)" logicalType.getTypeRoot match { - case TIMESTAMP_WITHOUT_TIME_ZONE => s"$containerNodeTerm.textNode($isoTerm)" - case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"""$containerNodeTerm.textNode($isoTerm + "Z")""" + case TIMESTAMP_WITHOUT_TIME_ZONE => s"$nodeFactoryTerm.textNode($isoTerm)" + case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"""$nodeFactoryTerm.textNode($isoTerm + "Z")""" } case TIMESTAMP_WITH_TIME_ZONE => throw new CodeGenException(s"'TIMESTAMP WITH TIME ZONE' is not yet supported.") case BINARY | VARBINARY => - s"$containerNodeTerm.binaryNode($term)" + s"$nodeFactoryTerm.binaryNode($term)" case ARRAY => - val converterName = generateArrayConverter(ctx, containerNodeTerm, + val converterName = generateArrayConverter(ctx, logicalType.asInstanceOf[ArrayType].getElementType) s"$converterName($term)" case ROW => - val converterName = generateRowConverter(ctx, containerNodeTerm, - logicalType.asInstanceOf[RowType]) + val converterName = generateRowConverter(ctx, logicalType.asInstanceOf[RowType]) s"$converterName($term)" case MAP => val mapType = logicalType.asInstanceOf[MapType] - val converterName = generateMapConverter(ctx, containerNodeTerm, mapType.getKeyType, - mapType.getValueType) + val converterName = generateMapConverter(ctx, mapType.getKeyType, mapType.getValueType) s"$converterName($term)" case MULTISET => - val converterName = generateMapConverter(ctx, containerNodeTerm, + val converterName = generateMapConverter(ctx, logicalType.asInstanceOf[MultisetType].getElementType, DataTypes.INT().getLogicalType) s"$converterName($term)" @@ -116,13 +116,12 @@ object JsonGenerateUtils { /** * Returns a term which wraps the given `valueExpr` as a raw [[JsonNode]]. * - * @param containerNodeTerm Name of the [[ContainerNode]] from which to create the raw node. * @param valueExpr Generated expression of the value which should be wrapped. * @return Generate code fragment creating the raw node. */ - def createRawNodeTerm(containerNodeTerm: String, valueExpr: GeneratedExpression): String = { + private def createRawNodeTerm(valueExpr: GeneratedExpression): String = { s""" - |$containerNodeTerm.rawValueNode( + |$jsonUtils.getNodeFactory().rawValueNode( | new ${typeTerm(classOf[RawValue])}(${valueExpr.resultTerm}.toString())) |""".stripMargin } @@ -153,7 +152,6 @@ object JsonGenerateUtils { /** Generates a method to convert arrays into [[ArrayNode]]. */ private def generateArrayConverter( ctx: CodeGeneratorContext, - containerNodeTerm: String, elementType: LogicalType): String = { val fieldAccessCode = toExternalTypeTerm( rowFieldReadAccess(ctx, "i", "arrData", elementType), elementType) @@ -162,10 +160,9 @@ object JsonGenerateUtils { val methodCode = s""" |private ${className[ArrayNode]} $methodName(${CodeGenUtils.ARRAY_DATA} arrData) { - | ${className[ArrayNode]} arrNode = $containerNodeTerm.arrayNode(); + | ${className[ArrayNode]} arrNode = $jsonUtils.getNodeFactory().arrayNode(); | for (int i = 0; i < arrData.size(); i++) { - | arrNode.add( - | ${createNodeTerm(ctx, containerNodeTerm, fieldAccessCode, elementType)}); + | arrNode.add(${createNodeTerm(ctx, fieldAccessCode, elementType)}); | } | | return arrNode; @@ -179,7 +176,6 @@ object JsonGenerateUtils { /** Generates a method to convert rows into [[ObjectNode]]. */ private def generateRowConverter( ctx: CodeGeneratorContext, - containerNodeTerm: String, rowType: RowType): String = { val populateObjectCode = toScala(rowType.getFieldNames).zipWithIndex.map { @@ -190,7 +186,7 @@ object JsonGenerateUtils { s""" |objNode.set("$fieldName", - | ${createNodeTerm(ctx, containerNodeTerm, fieldAccessCode, fieldType)}); + | ${createNodeTerm(ctx, fieldAccessCode, fieldType)}); |""".stripMargin }.mkString @@ -198,7 +194,7 @@ object JsonGenerateUtils { val methodCode = s""" |private ${className[ObjectNode]} $methodName(${CodeGenUtils.ROW_DATA} rowData) { - | ${className[ObjectNode]} objNode = $containerNodeTerm.objectNode(); + | ${className[ObjectNode]} objNode = $jsonUtils.getNodeFactory().objectNode(); | $populateObjectCode | | return objNode; @@ -212,7 +208,6 @@ object JsonGenerateUtils { /** Generates a method to convert maps into [[ObjectNode]]. */ private def generateMapConverter( ctx: CodeGeneratorContext, - containerNodeTerm: String, keyType: LogicalType, valueType: LogicalType): String = { if (!isCharacterString(keyType)) { @@ -230,7 +225,7 @@ object JsonGenerateUtils { val methodCode = s""" |private ${className[ObjectNode]} $methodName(${CodeGenUtils.MAP_DATA} mapData) { - | ${className[ObjectNode]} objNode = $containerNodeTerm.objectNode(); + | ${className[ObjectNode]} objNode = $jsonUtils.getNodeFactory().objectNode(); | for (int i = 0; i < mapData.size(); i++) { | java.lang.String key = $keyAccessCode; | if (key == null) { @@ -238,8 +233,7 @@ object JsonGenerateUtils { | + " was null. This is not supported during conversion to JSON."); | } | - | objNode.set(key, - | ${createNodeTerm(ctx, containerNodeTerm, valueAccessCode, valueType)}); + | objNode.set(key, ${createNodeTerm(ctx, valueAccessCode, valueType)}); | } | | return objNode; diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonArrayCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonArrayCallGen.scala index f5275e9..60b05ed 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonArrayCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonArrayCallGen.scala @@ -47,7 +47,7 @@ class JsonArrayCallGen(call: RexCall) extends CallGenerator { val onNull = getOnNullBehavior(operands.head) val populateNodeCode = operands.zipWithIndex.drop(1).map { case (elementExpr, elementIdx) => - val elementTerm = createNodeTerm(ctx, nodeTerm, elementExpr, call.operands.get(elementIdx)) + val elementTerm = createNodeTerm(ctx, elementExpr, call.operands.get(elementIdx)) onNull match { case NULL_ON_NULL => diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala index f946a6d..967089a 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/JsonObjectCallGen.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.table.planner.codegen.calls + import org.apache.flink.table.planner.codegen.CodeGenUtils._ import org.apache.flink.table.planner.codegen.JsonGenerateUtils.{createNodeTerm, getOnNullBehavior} import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, GeneratedExpression} @@ -56,7 +57,7 @@ class JsonObjectCallGen(call: RexCall) extends CallGenerator { val onNull = getOnNullBehavior(operands.head) val populateNodeCode = operands.zipWithIndex.drop(1).grouped(2).map { case Seq((keyExpr, _), (valueExpr, valueIdx)) => - val valueTerm = createNodeTerm(ctx, nodeTerm, valueExpr, call.operands.get(valueIdx)) + val valueTerm = createNodeTerm(ctx, valueExpr, call.operands.get(valueIdx)) onNull match { case NULL_ON_NULL => diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java index b942486..92e2583 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/SqlJsonUtils.java @@ -26,6 +26,7 @@ import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessin import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.JsonNodeFactory; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; /** @@ -39,6 +40,11 @@ public class SqlJsonUtils { private static final JsonFactory JSON_FACTORY = new JsonFactory(); private static final ObjectMapper MAPPER = new ObjectMapper(JSON_FACTORY); + /** Returns the {@link JsonNodeFactory} for creating nodes. */ + public static JsonNodeFactory getNodeFactory() { + return MAPPER.getNodeFactory(); + } + /** Returns a new {@link ObjectNode}. */ public static ObjectNode createObjectNode() { return MAPPER.createObjectNode();