This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 70f0081264cf01e5c94ddc692e4cb9433c2a6007 Author: JingsongLi <lzljs3620...@aliyun.com> AuthorDate: Thu Aug 29 17:10:36 2019 +0800 [FLINK-13775][table-planner-blink] Rename RexNodeConverter to ExpressionConverter --- .../ExpressionConverter.java} | 10 +++++--- .../planner/plan/QueryOperationConverter.java | 30 +++++++++++----------- .../codegen/agg/DeclarativeAggCodeGen.scala | 5 ++-- .../planner/codegen/agg/DistinctAggCodeGen.scala | 4 +-- .../planner/codegen/agg/ImperativeAggCodeGen.scala | 5 ++-- .../codegen/agg/batch/AggCodeGenHelper.scala | 18 +++++++------ .../codegen/agg/batch/HashAggCodeGenHelper.scala | 13 +++++----- .../codegen/agg/batch/WindowCodeGenerator.scala | 5 ++-- .../PushFilterIntoTableSourceScanRule.scala | 4 +-- .../table/planner/sources/TableSourceUtil.scala | 4 +-- .../batch/table/stringexpr/SetOperatorsTest.scala | 2 +- 11 files changed, 54 insertions(+), 46 deletions(-) diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java similarity index 99% rename from flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java rename to flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java index 2e3159d..b1e8e96 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.table.planner.expressions; +package org.apache.flink.table.planner.expressions.converter; import org.apache.flink.table.api.TableException; import org.apache.flink.table.dataformat.Decimal; @@ -47,6 +47,8 @@ import org.apache.flink.table.planner.calcite.FlinkPlannerImpl; import org.apache.flink.table.planner.calcite.FlinkRelBuilder; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.RexFieldVariable; +import org.apache.flink.table.planner.expressions.RexNodeExpression; +import org.apache.flink.table.planner.expressions.SqlAggFunctionVisitor; import org.apache.flink.table.planner.functions.InternalFunctionDefinitions; import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable; import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction; @@ -117,7 +119,7 @@ import static org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoT /** * Visit expression to generator {@link RexNode}. */ -public class RexNodeConverter implements ExpressionVisitor<RexNode> { +public class ExpressionConverter implements ExpressionVisitor<RexNode> { private final RelBuilder relBuilder; private final FlinkTypeFactory typeFactory; @@ -125,7 +127,7 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> { // store mapping from BuiltInFunctionDefinition to it's RexNodeConversion. private final Map<FunctionDefinition, RexNodeConversion> conversionsOfBuiltInFunc = new IdentityHashMap<>(); - public RexNodeConverter(RelBuilder relBuilder) { + public ExpressionConverter(RelBuilder relBuilder) { this.relBuilder = relBuilder; this.typeFactory = (FlinkTypeFactory) relBuilder.getRexBuilder().getTypeFactory(); @@ -358,7 +360,7 @@ public class RexNodeConverter implements ExpressionVisitor<RexNode> { private List<RexNode> convertCallChildren(List<Expression> children) { return children.stream() - .map(expression -> expression.accept(RexNodeConverter.this)) + .map(expression -> expression.accept(ExpressionConverter.this)) .collect(Collectors.toList()); } diff --git a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java index 481f125..7a4da1f 100644 --- a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java +++ b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java @@ -59,9 +59,9 @@ import org.apache.flink.table.planner.expressions.PlannerRowtimeAttribute; import org.apache.flink.table.planner.expressions.PlannerWindowEnd; import org.apache.flink.table.planner.expressions.PlannerWindowReference; import org.apache.flink.table.planner.expressions.PlannerWindowStart; -import org.apache.flink.table.planner.expressions.RexNodeConverter; import org.apache.flink.table.planner.expressions.RexNodeExpression; import org.apache.flink.table.planner.expressions.SqlAggFunctionVisitor; +import org.apache.flink.table.planner.expressions.converter.ExpressionConverter; import org.apache.flink.table.planner.functions.utils.TableSqlFunction; import org.apache.flink.table.planner.operations.DataStreamQueryOperation; import org.apache.flink.table.planner.operations.PlannerQueryOperation; @@ -119,7 +119,7 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod private final FlinkRelBuilder relBuilder; private final SingleRelVisitor singleRelVisitor = new SingleRelVisitor(); private final LookupCallResolver callResolver; - private final RexNodeConverter rexNodeConverter; + private final ExpressionConverter expressionConverter; private final AggregateVisitor aggregateVisitor = new AggregateVisitor(); private final TableAggregateVisitor tableAggregateVisitor = new TableAggregateVisitor(); private final JoinExpressionVisitor joinExpressionVisitor = new JoinExpressionVisitor(); @@ -127,7 +127,7 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod public QueryOperationConverter(FlinkRelBuilder relBuilder, FunctionLookup functionCatalog) { this.relBuilder = relBuilder; this.callResolver = new LookupCallResolver(functionCatalog); - this.rexNodeConverter = new RexNodeConverter(relBuilder); + this.expressionConverter = new ExpressionConverter(relBuilder); } @Override @@ -499,7 +499,7 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod Expression aggregate = unresolvedCall.getChildren().get(0); if (isFunctionOfKind(aggregate, AGGREGATE)) { return aggregate.accept(callResolver).accept( - new AggCallVisitor(relBuilder, rexNodeConverter, aggregateName, false)); + new AggCallVisitor(relBuilder, expressionConverter, aggregateName, false)); } } throw new TableException("Expected named aggregate. Got: " + unresolvedCall); @@ -514,15 +514,15 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod private final RelBuilder relBuilder; private final SqlAggFunctionVisitor sqlAggFunctionVisitor; - private final RexNodeConverter rexNodeConverter; + private final ExpressionConverter expressionConverter; private final String name; private final boolean isDistinct; - public AggCallVisitor(RelBuilder relBuilder, RexNodeConverter rexNodeConverter, String name, + public AggCallVisitor(RelBuilder relBuilder, ExpressionConverter expressionConverter, String name, boolean isDistinct) { this.relBuilder = relBuilder; this.sqlAggFunctionVisitor = new SqlAggFunctionVisitor((FlinkTypeFactory) relBuilder.getTypeFactory()); - this.rexNodeConverter = rexNodeConverter; + this.expressionConverter = expressionConverter; this.name = name; this.isDistinct = isDistinct; } @@ -532,7 +532,7 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod FunctionDefinition def = call.getFunctionDefinition(); if (BuiltInFunctionDefinitions.DISTINCT == def) { Expression innerAgg = call.getChildren().get(0); - return innerAgg.accept(new AggCallVisitor(relBuilder, rexNodeConverter, name, true)); + return innerAgg.accept(new AggCallVisitor(relBuilder, expressionConverter, name, true)); } else { SqlAggFunction sqlAggFunction = call.accept(sqlAggFunctionVisitor); return relBuilder.aggregateCall( @@ -541,7 +541,7 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod false, null, name, - call.getChildren().stream().map(expr -> expr.accept(rexNodeConverter)) + call.getChildren().stream().map(expr -> expr.accept(expressionConverter)) .collect(Collectors.toList())); } } @@ -557,7 +557,7 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod @Override public AggCall visit(CallExpression call) { if (isFunctionOfKind(call, TABLE_AGGREGATE)) { - return call.accept(new TableAggCallVisitor(relBuilder, rexNodeConverter)); + return call.accept(new TableAggCallVisitor(relBuilder, expressionConverter)); } return defaultMethod(call); } @@ -571,12 +571,12 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod private final RelBuilder relBuilder; private final SqlAggFunctionVisitor sqlAggFunctionVisitor; - private final RexNodeConverter rexNodeConverter; + private final ExpressionConverter expressionConverter; - public TableAggCallVisitor(RelBuilder relBuilder, RexNodeConverter rexNodeConverter) { + public TableAggCallVisitor(RelBuilder relBuilder, ExpressionConverter expressionConverter) { this.relBuilder = relBuilder; this.sqlAggFunctionVisitor = new SqlAggFunctionVisitor((FlinkTypeFactory) relBuilder.getTypeFactory()); - this.rexNodeConverter = rexNodeConverter; + this.expressionConverter = expressionConverter; } @Override @@ -588,7 +588,7 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod false, null, sqlAggFunction.toString(), - call.getChildren().stream().map(expr -> expr.accept(rexNodeConverter)).collect(toList())); + call.getChildren().stream().map(expr -> expr.accept(expressionConverter)).collect(toList())); } @Override @@ -599,6 +599,6 @@ public class QueryOperationConverter extends QueryOperationDefaultVisitor<RelNod } private RexNode convertExprToRexNode(Expression expr) { - return expr.accept(callResolver).accept(rexNodeConverter); + return expr.accept(callResolver).accept(expressionConverter); } } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala index afa4e5b..37941bf 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala @@ -22,7 +22,8 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.DISTINCT_KEY_TERM import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression} import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.{toRexDistinctKey, toRexInputRef} -import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter, RexNodeExpression} +import org.apache.flink.table.planner.expressions.converter.ExpressionConverter +import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeExpression} import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction import org.apache.flink.table.planner.plan.utils.AggregateInfo import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.{fromDataTypeToLogicalType, fromLogicalTypeToDataType} @@ -65,7 +66,7 @@ class DeclarativeAggCodeGen( private val bufferTerms = function.aggBufferAttributes .map(a => s"agg${aggInfo.aggIndex}_${a.getName}") - private val rexNodeGen = new RexNodeConverter(relBuilder) + private val rexNodeGen = new ExpressionConverter(relBuilder) private val bufferNullTerms = { val exprCodegen = new ExprCodeGenerator(ctx, false) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala index 8db2d6d..22f0ba0 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala @@ -27,7 +27,7 @@ import org.apache.flink.table.planner.codegen.GenerateUtils.{generateFieldAccess import org.apache.flink.table.planner.codegen.GeneratedExpression._ import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator._ import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression} -import org.apache.flink.table.planner.expressions.RexNodeConverter +import org.apache.flink.table.planner.expressions.converter.ExpressionConverter import org.apache.flink.table.planner.plan.utils.DistinctInfo import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType import org.apache.flink.table.types.DataType @@ -91,7 +91,7 @@ class DistinctAggCodeGen( val isValueChangedTerm: String = s"is_distinct_value_changed_$distinctIndex" val isValueEmptyTerm: String = s"is_distinct_value_empty_$distinctIndex" val valueGenerator: DistinctValueGenerator = createDistinctValueGenerator() - private val rexNodeGen = new RexNodeConverter(relBuilder) + private val rexNodeGen = new ExpressionConverter(relBuilder) addReusableDistinctAccumulator() diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala index e5d0ea4..457310c 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala @@ -25,8 +25,9 @@ import org.apache.flink.table.planner.codegen.GenerateUtils.generateFieldAccess import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator._ import org.apache.flink.table.planner.codegen.{CodeGenException, CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression} import org.apache.flink.table.planner.dataview.DataViewSpec +import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef -import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter} +import org.apache.flink.table.planner.expressions.converter.ExpressionConverter import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getAggFunctionUDIMethod, getAggUserDefinedInputTypes, getUserDefinedMethod, internalTypesToClasses, signatureToString} import org.apache.flink.table.planner.plan.utils.AggregateInfo import org.apache.flink.table.planner.utils.SingleElementIterator @@ -112,7 +113,7 @@ class ImperativeAggCodeGen( private val externalResultType = aggInfo.externalResultType private val internalResultType = fromDataTypeToLogicalType(externalResultType) - private val rexNodeGen = new RexNodeConverter(relBuilder) + private val rexNodeGen = new ExpressionConverter(relBuilder) val viewSpecs: Array[DataViewSpec] = aggInfo.viewSpecs // add reusable dataviews to context diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala index c02ae98..389cd7e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala @@ -27,8 +27,9 @@ import org.apache.flink.table.functions.{AggregateFunction, UserDefinedFunction} import org.apache.flink.table.planner.codegen.CodeGenUtils._ import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.STREAM_RECORD import org.apache.flink.table.planner.codegen._ +import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef -import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter} +import org.apache.flink.table.planner.expressions.converter.ExpressionConverter import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getAccumulatorTypeOfAggregateFunction, getAggUserDefinedInputTypes} import org.apache.flink.table.runtime.context.ExecutionContextImpl @@ -283,7 +284,7 @@ object AggCodeGenHelper { aggBufferNames: Array[Array[String]], aggBufferTypes: Array[Array[LogicalType]]): Seq[GeneratedExpression] = { val exprCodegen = new ExprCodeGenerator(ctx, false) - val converter = new RexNodeConverter(builder) + val converter = new ExpressionConverter(builder) val accessAuxGroupingExprs = auxGrouping.indices.map { idx => newLocalReference(aggBufferNames(idx)(0), aggBufferTypes(idx)(0)) @@ -338,7 +339,7 @@ object AggCodeGenHelper { case (agg: AggregateFunction[_, _]) => Some(agg) }.map { - case (expr: Expression) => expr.accept(new RexNodeConverter(builder)) + case (expr: Expression) => expr.accept(new ExpressionConverter(builder)) case t@_ => t }.map { case (rex: RexNode) => exprCodegen.generateExpression(rex) @@ -490,7 +491,7 @@ object AggCodeGenHelper { val idx = auxGrouping.length + aggIndex (agg, idx) }.map { - case (expr: Expression) => expr.accept(new RexNodeConverter(builder)) + case (expr: Expression) => expr.accept(new ExpressionConverter(builder)) case t@_ => t }.map { case (rex: RexNode) => exprCodegen.generateExpression(rex) @@ -535,7 +536,8 @@ object AggCodeGenHelper { }.zip(aggBufferExprs.slice(auxGrouping.length, aggBufferExprs.size)).map { // DeclarativeAggregateFunction case ((expr: Expression), aggBufVar) => - val mergeExpr = exprCodegen.generateExpression(expr.accept(new RexNodeConverter(builder))) + val mergeExpr = exprCodegen.generateExpression( + expr.accept(new ExpressionConverter(builder))) s""" |${mergeExpr.code} |${aggBufVar.nullTerm} = ${mergeExpr.nullTerm}; @@ -548,7 +550,7 @@ object AggCodeGenHelper { val (inputIndex, inputType) = argsMapping(aggIndex)(0) val inputRef = toRexInputRef(builder, inputIndex, inputType) val inputExpr = exprCodegen.generateExpression( - inputRef.accept(new RexNodeConverter(builder))) + inputRef.accept(new ExpressionConverter(builder))) val singleIterableClass = classOf[SingleElementIterator[_]].getCanonicalName val externalAccT = getAccumulatorTypeOfAggregateFunction(agg) @@ -599,7 +601,7 @@ object AggCodeGenHelper { }.zip(aggBufferExprs.slice(auxGrouping.length, aggBufferExprs.size)).map { // DeclarativeAggregateFunction case ((expr: Expression, aggCall: AggregateCall), aggBufVar) => - val accExpr = exprCodegen.generateExpression(expr.accept(new RexNodeConverter(builder))) + val accExpr = exprCodegen.generateExpression(expr.accept(new ExpressionConverter(builder))) (s""" |${accExpr.code} |${aggBufVar.nullTerm} = ${accExpr.nullTerm}; @@ -616,7 +618,7 @@ object AggCodeGenHelper { val inputExprs = inFields.map { f => val inputRef = toRexInputRef(builder, f._1, f._2) - exprCodegen.generateExpression(inputRef.accept(new RexNodeConverter(builder))) + exprCodegen.generateExpression(inputRef.accept(new ExpressionConverter(builder))) } val externalUDITypes = getAggUserDefinedInputTypes( diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala index 8cdaf05..8faca38 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala @@ -27,8 +27,9 @@ import org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAcc import org.apache.flink.table.planner.codegen._ import org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper.buildAggregateArgsMapping import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator +import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef -import org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, RexNodeConverter} +import org.apache.flink.table.planner.expressions.converter.ExpressionConverter import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction import org.apache.flink.table.planner.plan.utils.SortUtil import org.apache.flink.table.runtime.generated.{NormalizedKeyComputer, RecordComparator} @@ -206,7 +207,7 @@ object HashAggCodeGenHelper { val initAggCallBufferExprs = aggregates.flatMap(a => a.asInstanceOf[DeclarativeAggregateFunction].initialValuesExpressions) - .map(_.accept(new RexNodeConverter(builder))) + .map(_.accept(new ExpressionConverter(builder))) .map(exprCodegen.generateExpression) val initAggBufferExprs = initAuxGroupingExprs ++ initAggCallBufferExprs @@ -288,14 +289,14 @@ object HashAggCodeGenHelper { val getAuxGroupingExprs = auxGrouping.indices.map { idx => val (_, resultType) = aggBuffMapping(idx)(0) toRexInputRef(builder, bindRefOffset + idx, resultType) - }.map(_.accept(new RexNodeConverter(builder))).map(exprCodegen.generateExpression) + }.map(_.accept(new ExpressionConverter(builder))).map(exprCodegen.generateExpression) val getAggValueExprs = aggregates.zipWithIndex.map { case (agg: DeclarativeAggregateFunction, aggIndex) => val idx = auxGrouping.length + aggIndex agg.getValueExpression.accept(ResolveReference( ctx, builder, isMerge, bindRefOffset, agg, idx, argsMapping, aggBuffMapping)) - }.map(_.accept(new RexNodeConverter(builder))).map(exprCodegen.generateExpression) + }.map(_.accept(new ExpressionConverter(builder))).map(exprCodegen.generateExpression) val getValueExprs = getAuxGroupingExprs ++ getAggValueExprs val aggValueTerm = CodeGenUtils.newName("aggVal") @@ -379,7 +380,7 @@ object HashAggCodeGenHelper { agg.mergeExpressions.map( _.accept(ResolveReference( ctx, builder, isMerge = true, bindRefOffset, agg, idx, argsMapping, aggBuffMapping))) - }.map(_.accept(new RexNodeConverter(builder))).map(exprCodegen.generateExpression) + }.map(_.accept(new ExpressionConverter(builder))).map(exprCodegen.generateExpression) val aggBufferTypeWithoutAuxGrouping = if (auxGrouping.nonEmpty) { // auxGrouping does not need merge-code @@ -441,7 +442,7 @@ object HashAggCodeGenHelper { } }.map { case (expr: Expression, aggCall: AggregateCall) => - (exprCodegen.generateExpression(expr.accept(new RexNodeConverter(builder))), + (exprCodegen.generateExpression(expr.accept(new ExpressionConverter(builder))), aggCall.filterArg) } diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala index 8a2d753..5ff275e 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala @@ -33,8 +33,9 @@ import org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateColl import org.apache.flink.table.planner.codegen._ import org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper.{buildAggregateArgsMapping, genAggregateByFlatAggregateBuffer, genFlatAggBufferExprs, genInitFlatAggregateBuffer} import org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator.{asLong, isTimeIntervalLiteral} +import org.apache.flink.table.planner.expressions.CallExpressionResolver import org.apache.flink.table.planner.expressions.ExpressionBuilder._ -import org.apache.flink.table.planner.expressions.{CallExpressionResolver, RexNodeConverter} +import org.apache.flink.table.planner.expressions.converter.ExpressionConverter import org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.getAccumulatorTypeOfAggregateFunction import org.apache.flink.table.planner.plan.logical.{LogicalWindow, SlidingGroupWindow, TumblingGroupWindow} @@ -696,7 +697,7 @@ abstract class WindowCodeGenerator( remainder)), literal(index * slideSize)) exprCodegen.generateExpression(new CallExpressionResolver(relBuilder).resolve(expr).accept( - new RexNodeConverter(relBuilder.values(inputRowType)))) + new ExpressionConverter(relBuilder.values(inputRowType)))) } def getGrouping: Array[Int] = grouping diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala index bd2ab26..a4363e2 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala @@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.rules.logical import org.apache.flink.table.api.config.OptimizerConfigOptions import org.apache.flink.table.expressions.Expression import org.apache.flink.table.planner.calcite.FlinkContext -import org.apache.flink.table.planner.expressions.RexNodeConverter +import org.apache.flink.table.planner.expressions.converter.ExpressionConverter import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, TableSourceTable} import org.apache.flink.table.planner.plan.stats.FlinkStatistic import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, RexNodeExtractor} @@ -112,7 +112,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule( call.transformTo(newScan) } else { relBuilder.push(scan) - val converter = new RexNodeConverter(relBuilder) + val converter = new ExpressionConverter(relBuilder) val remainingConditions = remainingPredicates.map(_.accept(converter)) ++ unconvertedRexNodes val remainingCondition = remainingConditions.reduce((l, r) => relBuilder.and(l, r)) val newFilter = filter.copy(filter.getTraitSet, newScan, remainingCondition) diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala index d46849f..a7be561 100644 --- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala +++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala @@ -25,7 +25,7 @@ import org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral, import org.apache.flink.table.expressions.{CallExpression, ResolvedExpression, ResolvedFieldReference} import org.apache.flink.table.functions.BuiltInFunctionDefinitions import org.apache.flink.table.planner.calcite.FlinkTypeFactory -import org.apache.flink.table.planner.expressions.RexNodeConverter +import org.apache.flink.table.planner.expressions.converter.ExpressionConverter import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter import org.apache.flink.table.runtime.types.PlannerTypeUtils.isAssignable import org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo @@ -293,7 +293,7 @@ object TableSourceUtil { typeLiteral(outputType), valueLiteral(false)), outputType) - val rexExpression = castExpression.accept(new RexNodeConverter(relBuilder)) + val rexExpression = castExpression.accept(new ExpressionConverter(relBuilder)) relBuilder.clear() rexExpression } diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala index c271eac..aae2ce3 100644 --- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala +++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala @@ -28,7 +28,7 @@ import java.sql.Timestamp class SetOperatorsTest extends TableTestBase { - @Ignore("Support in subQuery in RexNodeConverter") + @Ignore("Support in subQuery in ExpressionConverter") @Test def testInWithFilter(): Unit = { val util = batchTestUtil()