This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 70f0081264cf01e5c94ddc692e4cb9433c2a6007
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Thu Aug 29 17:10:36 2019 +0800

    [FLINK-13775][table-planner-blink] Rename RexNodeConverter to 
ExpressionConverter
---
 .../ExpressionConverter.java}                      | 10 +++++---
 .../planner/plan/QueryOperationConverter.java      | 30 +++++++++++-----------
 .../codegen/agg/DeclarativeAggCodeGen.scala        |  5 ++--
 .../planner/codegen/agg/DistinctAggCodeGen.scala   |  4 +--
 .../planner/codegen/agg/ImperativeAggCodeGen.scala |  5 ++--
 .../codegen/agg/batch/AggCodeGenHelper.scala       | 18 +++++++------
 .../codegen/agg/batch/HashAggCodeGenHelper.scala   | 13 +++++-----
 .../codegen/agg/batch/WindowCodeGenerator.scala    |  5 ++--
 .../PushFilterIntoTableSourceScanRule.scala        |  4 +--
 .../table/planner/sources/TableSourceUtil.scala    |  4 +--
 .../batch/table/stringexpr/SetOperatorsTest.scala  |  2 +-
 11 files changed, 54 insertions(+), 46 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
similarity index 99%
rename from 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
rename to 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
index 2e3159d..b1e8e96 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/RexNodeConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.table.planner.expressions;
+package org.apache.flink.table.planner.expressions.converter;
 
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.dataformat.Decimal;
@@ -47,6 +47,8 @@ import 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.RexFieldVariable;
+import org.apache.flink.table.planner.expressions.RexNodeExpression;
+import org.apache.flink.table.planner.expressions.SqlAggFunctionVisitor;
 import org.apache.flink.table.planner.functions.InternalFunctionDefinitions;
 import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
 import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction;
@@ -117,7 +119,7 @@ import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoT
 /**
  * Visit expression to generator {@link RexNode}.
  */
-public class RexNodeConverter implements ExpressionVisitor<RexNode> {
+public class ExpressionConverter implements ExpressionVisitor<RexNode> {
 
        private final RelBuilder relBuilder;
        private final FlinkTypeFactory typeFactory;
@@ -125,7 +127,7 @@ public class RexNodeConverter implements 
ExpressionVisitor<RexNode> {
        // store mapping from BuiltInFunctionDefinition to it's 
RexNodeConversion.
        private final Map<FunctionDefinition, RexNodeConversion> 
conversionsOfBuiltInFunc = new IdentityHashMap<>();
 
-       public RexNodeConverter(RelBuilder relBuilder) {
+       public ExpressionConverter(RelBuilder relBuilder) {
                this.relBuilder = relBuilder;
                this.typeFactory = (FlinkTypeFactory) 
relBuilder.getRexBuilder().getTypeFactory();
 
@@ -358,7 +360,7 @@ public class RexNodeConverter implements 
ExpressionVisitor<RexNode> {
 
        private List<RexNode> convertCallChildren(List<Expression> children) {
                return children.stream()
-                               .map(expression -> 
expression.accept(RexNodeConverter.this))
+                               .map(expression -> 
expression.accept(ExpressionConverter.this))
                                .collect(Collectors.toList());
        }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
index 481f125..7a4da1f 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/plan/QueryOperationConverter.java
@@ -59,9 +59,9 @@ import 
org.apache.flink.table.planner.expressions.PlannerRowtimeAttribute;
 import org.apache.flink.table.planner.expressions.PlannerWindowEnd;
 import org.apache.flink.table.planner.expressions.PlannerWindowReference;
 import org.apache.flink.table.planner.expressions.PlannerWindowStart;
-import org.apache.flink.table.planner.expressions.RexNodeConverter;
 import org.apache.flink.table.planner.expressions.RexNodeExpression;
 import org.apache.flink.table.planner.expressions.SqlAggFunctionVisitor;
+import 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
 import org.apache.flink.table.planner.functions.utils.TableSqlFunction;
 import org.apache.flink.table.planner.operations.DataStreamQueryOperation;
 import org.apache.flink.table.planner.operations.PlannerQueryOperation;
@@ -119,7 +119,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
        private final FlinkRelBuilder relBuilder;
        private final SingleRelVisitor singleRelVisitor = new 
SingleRelVisitor();
        private final LookupCallResolver callResolver;
-       private final RexNodeConverter rexNodeConverter;
+       private final ExpressionConverter expressionConverter;
        private final AggregateVisitor aggregateVisitor = new 
AggregateVisitor();
        private final TableAggregateVisitor tableAggregateVisitor = new 
TableAggregateVisitor();
        private final JoinExpressionVisitor joinExpressionVisitor = new 
JoinExpressionVisitor();
@@ -127,7 +127,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
        public QueryOperationConverter(FlinkRelBuilder relBuilder, 
FunctionLookup functionCatalog) {
                this.relBuilder = relBuilder;
                this.callResolver = new LookupCallResolver(functionCatalog);
-               this.rexNodeConverter = new RexNodeConverter(relBuilder);
+               this.expressionConverter = new ExpressionConverter(relBuilder);
        }
 
        @Override
@@ -499,7 +499,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                                Expression aggregate = 
unresolvedCall.getChildren().get(0);
                                if (isFunctionOfKind(aggregate, AGGREGATE)) {
                                        return 
aggregate.accept(callResolver).accept(
-                                                       new 
AggCallVisitor(relBuilder, rexNodeConverter, aggregateName, false));
+                                                       new 
AggCallVisitor(relBuilder, expressionConverter, aggregateName, false));
                                }
                        }
                        throw new TableException("Expected named aggregate. 
Got: " + unresolvedCall);
@@ -514,15 +514,15 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
 
                        private final RelBuilder relBuilder;
                        private final SqlAggFunctionVisitor 
sqlAggFunctionVisitor;
-                       private final RexNodeConverter rexNodeConverter;
+                       private final ExpressionConverter expressionConverter;
                        private final String name;
                        private final boolean isDistinct;
 
-                       public AggCallVisitor(RelBuilder relBuilder, 
RexNodeConverter rexNodeConverter, String name,
+                       public AggCallVisitor(RelBuilder relBuilder, 
ExpressionConverter expressionConverter, String name,
                                        boolean isDistinct) {
                                this.relBuilder = relBuilder;
                                this.sqlAggFunctionVisitor = new 
SqlAggFunctionVisitor((FlinkTypeFactory) relBuilder.getTypeFactory());
-                               this.rexNodeConverter = rexNodeConverter;
+                               this.expressionConverter = expressionConverter;
                                this.name = name;
                                this.isDistinct = isDistinct;
                        }
@@ -532,7 +532,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                                FunctionDefinition def = 
call.getFunctionDefinition();
                                if (BuiltInFunctionDefinitions.DISTINCT == def) 
{
                                        Expression innerAgg = 
call.getChildren().get(0);
-                                       return innerAgg.accept(new 
AggCallVisitor(relBuilder, rexNodeConverter, name, true));
+                                       return innerAgg.accept(new 
AggCallVisitor(relBuilder, expressionConverter, name, true));
                                } else {
                                        SqlAggFunction sqlAggFunction = 
call.accept(sqlAggFunctionVisitor);
                                        return relBuilder.aggregateCall(
@@ -541,7 +541,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                                                false,
                                                null,
                                                name,
-                                               
call.getChildren().stream().map(expr -> expr.accept(rexNodeConverter))
+                                               
call.getChildren().stream().map(expr -> expr.accept(expressionConverter))
                                                        
.collect(Collectors.toList()));
                                }
                        }
@@ -557,7 +557,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                @Override
                public AggCall visit(CallExpression call) {
                        if (isFunctionOfKind(call, TABLE_AGGREGATE)) {
-                               return call.accept(new 
TableAggCallVisitor(relBuilder, rexNodeConverter));
+                               return call.accept(new 
TableAggCallVisitor(relBuilder, expressionConverter));
                        }
                        return defaultMethod(call);
                }
@@ -571,12 +571,12 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
 
                        private final RelBuilder relBuilder;
                        private final SqlAggFunctionVisitor 
sqlAggFunctionVisitor;
-                       private final RexNodeConverter rexNodeConverter;
+                       private final ExpressionConverter expressionConverter;
 
-                       public TableAggCallVisitor(RelBuilder relBuilder, 
RexNodeConverter rexNodeConverter) {
+                       public TableAggCallVisitor(RelBuilder relBuilder, 
ExpressionConverter expressionConverter) {
                                this.relBuilder = relBuilder;
                                this.sqlAggFunctionVisitor = new 
SqlAggFunctionVisitor((FlinkTypeFactory) relBuilder.getTypeFactory());
-                               this.rexNodeConverter = rexNodeConverter;
+                               this.expressionConverter = expressionConverter;
                        }
 
                        @Override
@@ -588,7 +588,7 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
                                        false,
                                        null,
                                        sqlAggFunction.toString(),
-                                       call.getChildren().stream().map(expr -> 
expr.accept(rexNodeConverter)).collect(toList()));
+                                       call.getChildren().stream().map(expr -> 
expr.accept(expressionConverter)).collect(toList()));
                        }
 
                        @Override
@@ -599,6 +599,6 @@ public class QueryOperationConverter extends 
QueryOperationDefaultVisitor<RelNod
        }
 
        private RexNode convertExprToRexNode(Expression expr) {
-               return expr.accept(callResolver).accept(rexNodeConverter);
+               return expr.accept(callResolver).accept(expressionConverter);
        }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
index afa4e5b..37941bf 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DeclarativeAggCodeGen.scala
@@ -22,7 +22,8 @@ import org.apache.flink.table.planner.calcite.FlinkTypeFactory
 import 
org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator.DISTINCT_KEY_TERM
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, GeneratedExpression}
 import 
org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.{toRexDistinctKey,
 toRexInputRef}
-import 
org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, 
RexNodeConverter, RexNodeExpression}
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
+import 
org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, 
RexNodeExpression}
 import 
org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.plan.utils.AggregateInfo
 import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.{fromDataTypeToLogicalType,
 fromLogicalTypeToDataType}
@@ -65,7 +66,7 @@ class DeclarativeAggCodeGen(
   private val bufferTerms = function.aggBufferAttributes
       .map(a => s"agg${aggInfo.aggIndex}_${a.getName}")
 
-  private val rexNodeGen = new RexNodeConverter(relBuilder)
+  private val rexNodeGen = new ExpressionConverter(relBuilder)
 
   private val bufferNullTerms = {
     val exprCodegen = new ExprCodeGenerator(ctx, false)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
index 8db2d6d..22f0ba0 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/DistinctAggCodeGen.scala
@@ -27,7 +27,7 @@ import 
org.apache.flink.table.planner.codegen.GenerateUtils.{generateFieldAccess
 import org.apache.flink.table.planner.codegen.GeneratedExpression._
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator._
 import org.apache.flink.table.planner.codegen.{CodeGeneratorContext, 
ExprCodeGenerator, GeneratedExpression}
-import org.apache.flink.table.planner.expressions.RexNodeConverter
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
 import org.apache.flink.table.planner.plan.utils.DistinctInfo
 import 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType
 import org.apache.flink.table.types.DataType
@@ -91,7 +91,7 @@ class DistinctAggCodeGen(
   val isValueChangedTerm: String = s"is_distinct_value_changed_$distinctIndex"
   val isValueEmptyTerm: String = s"is_distinct_value_empty_$distinctIndex"
   val valueGenerator: DistinctValueGenerator = createDistinctValueGenerator()
-  private val rexNodeGen = new RexNodeConverter(relBuilder)
+  private val rexNodeGen = new ExpressionConverter(relBuilder)
 
   addReusableDistinctAccumulator()
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
index e5d0ea4..457310c 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/ImperativeAggCodeGen.scala
@@ -25,8 +25,9 @@ import 
org.apache.flink.table.planner.codegen.GenerateUtils.generateFieldAccess
 import org.apache.flink.table.planner.codegen.agg.AggsHandlerCodeGenerator._
 import org.apache.flink.table.planner.codegen.{CodeGenException, 
CodeGeneratorContext, ExprCodeGenerator, GeneratedExpression}
 import org.apache.flink.table.planner.dataview.DataViewSpec
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver
 import 
org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
-import 
org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, 
RexNodeConverter}
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
 import 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getAggFunctionUDIMethod,
 getAggUserDefinedInputTypes, getUserDefinedMethod, internalTypesToClasses, 
signatureToString}
 import org.apache.flink.table.planner.plan.utils.AggregateInfo
 import org.apache.flink.table.planner.utils.SingleElementIterator
@@ -112,7 +113,7 @@ class ImperativeAggCodeGen(
   private val externalResultType = aggInfo.externalResultType
   private val internalResultType = 
fromDataTypeToLogicalType(externalResultType)
 
-  private val rexNodeGen = new RexNodeConverter(relBuilder)
+  private val rexNodeGen = new ExpressionConverter(relBuilder)
 
   val viewSpecs: Array[DataViewSpec] = aggInfo.viewSpecs
   // add reusable dataviews to context
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
index c02ae98..389cd7e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/AggCodeGenHelper.scala
@@ -27,8 +27,9 @@ import org.apache.flink.table.functions.{AggregateFunction, 
UserDefinedFunction}
 import org.apache.flink.table.planner.codegen.CodeGenUtils._
 import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.STREAM_RECORD
 import org.apache.flink.table.planner.codegen._
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver
 import 
org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
-import 
org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, 
RexNodeConverter}
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
 import 
org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.{getAccumulatorTypeOfAggregateFunction,
 getAggUserDefinedInputTypes}
 import org.apache.flink.table.runtime.context.ExecutionContextImpl
@@ -283,7 +284,7 @@ object AggCodeGenHelper {
       aggBufferNames: Array[Array[String]],
       aggBufferTypes: Array[Array[LogicalType]]): Seq[GeneratedExpression] = {
     val exprCodegen = new ExprCodeGenerator(ctx, false)
-    val converter = new RexNodeConverter(builder)
+    val converter = new ExpressionConverter(builder)
 
     val accessAuxGroupingExprs = auxGrouping.indices.map {
       idx => newLocalReference(aggBufferNames(idx)(0), aggBufferTypes(idx)(0))
@@ -338,7 +339,7 @@ object AggCodeGenHelper {
       case (agg: AggregateFunction[_, _]) =>
         Some(agg)
     }.map {
-      case (expr: Expression) => expr.accept(new RexNodeConverter(builder))
+      case (expr: Expression) => expr.accept(new ExpressionConverter(builder))
       case t@_ => t
     }.map {
       case (rex: RexNode) => exprCodegen.generateExpression(rex)
@@ -490,7 +491,7 @@ object AggCodeGenHelper {
         val idx = auxGrouping.length + aggIndex
         (agg, idx)
     }.map {
-      case (expr: Expression) => expr.accept(new RexNodeConverter(builder))
+      case (expr: Expression) => expr.accept(new ExpressionConverter(builder))
       case t@_ => t
     }.map {
       case (rex: RexNode) => exprCodegen.generateExpression(rex)
@@ -535,7 +536,8 @@ object AggCodeGenHelper {
     }.zip(aggBufferExprs.slice(auxGrouping.length, aggBufferExprs.size)).map {
       // DeclarativeAggregateFunction
       case ((expr: Expression), aggBufVar) =>
-        val mergeExpr = exprCodegen.generateExpression(expr.accept(new 
RexNodeConverter(builder)))
+        val mergeExpr = exprCodegen.generateExpression(
+          expr.accept(new ExpressionConverter(builder)))
         s"""
            |${mergeExpr.code}
            |${aggBufVar.nullTerm} = ${mergeExpr.nullTerm};
@@ -548,7 +550,7 @@ object AggCodeGenHelper {
         val (inputIndex, inputType) = argsMapping(aggIndex)(0)
         val inputRef = toRexInputRef(builder, inputIndex, inputType)
         val inputExpr = exprCodegen.generateExpression(
-          inputRef.accept(new RexNodeConverter(builder)))
+          inputRef.accept(new ExpressionConverter(builder)))
         val singleIterableClass = 
classOf[SingleElementIterator[_]].getCanonicalName
 
         val externalAccT = getAccumulatorTypeOfAggregateFunction(agg)
@@ -599,7 +601,7 @@ object AggCodeGenHelper {
     }.zip(aggBufferExprs.slice(auxGrouping.length, aggBufferExprs.size)).map {
       // DeclarativeAggregateFunction
       case ((expr: Expression, aggCall: AggregateCall), aggBufVar) =>
-        val accExpr = exprCodegen.generateExpression(expr.accept(new 
RexNodeConverter(builder)))
+        val accExpr = exprCodegen.generateExpression(expr.accept(new 
ExpressionConverter(builder)))
         (s"""
             |${accExpr.code}
             |${aggBufVar.nullTerm} = ${accExpr.nullTerm};
@@ -616,7 +618,7 @@ object AggCodeGenHelper {
         val inputExprs = inFields.map {
           f =>
             val inputRef = toRexInputRef(builder, f._1, f._2)
-            exprCodegen.generateExpression(inputRef.accept(new 
RexNodeConverter(builder)))
+            exprCodegen.generateExpression(inputRef.accept(new 
ExpressionConverter(builder)))
         }
 
         val externalUDITypes = getAggUserDefinedInputTypes(
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
index 8cdaf05..8faca38 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/HashAggCodeGenHelper.scala
@@ -27,8 +27,9 @@ import 
org.apache.flink.table.planner.codegen.CodeGenUtils.{binaryRowFieldSetAcc
 import org.apache.flink.table.planner.codegen._
 import 
org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper.buildAggregateArgsMapping
 import org.apache.flink.table.planner.codegen.sort.SortCodeGenerator
+import org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver
 import 
org.apache.flink.table.planner.expressions.DeclarativeExpressionResolver.toRexInputRef
-import 
org.apache.flink.table.planner.expressions.{DeclarativeExpressionResolver, 
RexNodeConverter}
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
 import 
org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import org.apache.flink.table.planner.plan.utils.SortUtil
 import org.apache.flink.table.runtime.generated.{NormalizedKeyComputer, 
RecordComparator}
@@ -206,7 +207,7 @@ object HashAggCodeGenHelper {
 
     val initAggCallBufferExprs = aggregates.flatMap(a =>
       a.asInstanceOf[DeclarativeAggregateFunction].initialValuesExpressions)
-        .map(_.accept(new RexNodeConverter(builder)))
+        .map(_.accept(new ExpressionConverter(builder)))
         .map(exprCodegen.generateExpression)
 
     val initAggBufferExprs = initAuxGroupingExprs ++ initAggCallBufferExprs
@@ -288,14 +289,14 @@ object HashAggCodeGenHelper {
       val getAuxGroupingExprs = auxGrouping.indices.map { idx =>
         val (_, resultType) = aggBuffMapping(idx)(0)
         toRexInputRef(builder, bindRefOffset + idx, resultType)
-      }.map(_.accept(new 
RexNodeConverter(builder))).map(exprCodegen.generateExpression)
+      }.map(_.accept(new 
ExpressionConverter(builder))).map(exprCodegen.generateExpression)
 
       val getAggValueExprs = aggregates.zipWithIndex.map {
         case (agg: DeclarativeAggregateFunction, aggIndex) =>
           val idx = auxGrouping.length + aggIndex
           agg.getValueExpression.accept(ResolveReference(
             ctx, builder, isMerge, bindRefOffset, agg, idx, argsMapping, 
aggBuffMapping))
-      }.map(_.accept(new 
RexNodeConverter(builder))).map(exprCodegen.generateExpression)
+      }.map(_.accept(new 
ExpressionConverter(builder))).map(exprCodegen.generateExpression)
 
       val getValueExprs = getAuxGroupingExprs ++ getAggValueExprs
       val aggValueTerm = CodeGenUtils.newName("aggVal")
@@ -379,7 +380,7 @@ object HashAggCodeGenHelper {
         agg.mergeExpressions.map(
           _.accept(ResolveReference(
             ctx, builder, isMerge = true, bindRefOffset, agg, idx, 
argsMapping, aggBuffMapping)))
-    }.map(_.accept(new 
RexNodeConverter(builder))).map(exprCodegen.generateExpression)
+    }.map(_.accept(new 
ExpressionConverter(builder))).map(exprCodegen.generateExpression)
 
     val aggBufferTypeWithoutAuxGrouping = if (auxGrouping.nonEmpty) {
       // auxGrouping does not need merge-code
@@ -441,7 +442,7 @@ object HashAggCodeGenHelper {
         }
     }.map {
       case (expr: Expression, aggCall: AggregateCall) =>
-        (exprCodegen.generateExpression(expr.accept(new 
RexNodeConverter(builder))),
+        (exprCodegen.generateExpression(expr.accept(new 
ExpressionConverter(builder))),
             aggCall.filterArg)
     }
 
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
index 8a2d753..5ff275e 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/agg/batch/WindowCodeGenerator.scala
@@ -33,8 +33,9 @@ import 
org.apache.flink.table.planner.codegen.OperatorCodeGenerator.generateColl
 import org.apache.flink.table.planner.codegen._
 import 
org.apache.flink.table.planner.codegen.agg.batch.AggCodeGenHelper.{buildAggregateArgsMapping,
 genAggregateByFlatAggregateBuffer, genFlatAggBufferExprs, 
genInitFlatAggregateBuffer}
 import 
org.apache.flink.table.planner.codegen.agg.batch.WindowCodeGenerator.{asLong, 
isTimeIntervalLiteral}
+import org.apache.flink.table.planner.expressions.CallExpressionResolver
 import org.apache.flink.table.planner.expressions.ExpressionBuilder._
-import org.apache.flink.table.planner.expressions.{CallExpressionResolver, 
RexNodeConverter}
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
 import 
org.apache.flink.table.planner.functions.aggfunctions.DeclarativeAggregateFunction
 import 
org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils.getAccumulatorTypeOfAggregateFunction
 import org.apache.flink.table.planner.plan.logical.{LogicalWindow, 
SlidingGroupWindow, TumblingGroupWindow}
@@ -696,7 +697,7 @@ abstract class WindowCodeGenerator(
         remainder)),
       literal(index * slideSize))
     exprCodegen.generateExpression(new 
CallExpressionResolver(relBuilder).resolve(expr).accept(
-      new RexNodeConverter(relBuilder.values(inputRowType))))
+      new ExpressionConverter(relBuilder.values(inputRowType))))
   }
 
   def getGrouping: Array[Int] = grouping
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
index bd2ab26..a4363e2 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/PushFilterIntoTableSourceScanRule.scala
@@ -21,7 +21,7 @@ package org.apache.flink.table.planner.plan.rules.logical
 import org.apache.flink.table.api.config.OptimizerConfigOptions
 import org.apache.flink.table.expressions.Expression
 import org.apache.flink.table.planner.calcite.FlinkContext
-import org.apache.flink.table.planner.expressions.RexNodeConverter
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
 import org.apache.flink.table.planner.plan.schema.{FlinkRelOptTable, 
TableSourceTable}
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic
 import org.apache.flink.table.planner.plan.utils.{FlinkRelOptUtil, 
RexNodeExtractor}
@@ -112,7 +112,7 @@ class PushFilterIntoTableSourceScanRule extends RelOptRule(
       call.transformTo(newScan)
     } else {
       relBuilder.push(scan)
-      val converter = new RexNodeConverter(relBuilder)
+      val converter = new ExpressionConverter(relBuilder)
       val remainingConditions = remainingPredicates.map(_.accept(converter)) 
++ unconvertedRexNodes
       val remainingCondition = remainingConditions.reduce((l, r) => 
relBuilder.and(l, r))
       val newFilter = filter.copy(filter.getTraitSet, newScan, 
remainingCondition)
diff --git 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
index d46849f..a7be561 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
+++ 
b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/sources/TableSourceUtil.scala
@@ -25,7 +25,7 @@ import 
org.apache.flink.table.expressions.utils.ApiExpressionUtils.{typeLiteral,
 import org.apache.flink.table.expressions.{CallExpression, ResolvedExpression, 
ResolvedFieldReference}
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory
-import org.apache.flink.table.planner.expressions.RexNodeConverter
+import org.apache.flink.table.planner.expressions.converter.ExpressionConverter
 import org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter
 import org.apache.flink.table.runtime.types.PlannerTypeUtils.isAssignable
 import 
org.apache.flink.table.runtime.types.TypeInfoDataTypeConverter.fromDataTypeToTypeInfo
@@ -293,7 +293,7 @@ object TableSourceUtil {
           typeLiteral(outputType),
           valueLiteral(false)),
         outputType)
-      val rexExpression = castExpression.accept(new 
RexNodeConverter(relBuilder))
+      val rexExpression = castExpression.accept(new 
ExpressionConverter(relBuilder))
       relBuilder.clear()
       rexExpression
     }
diff --git 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala
 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala
index c271eac..aae2ce3 100644
--- 
a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala
+++ 
b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/table/stringexpr/SetOperatorsTest.scala
@@ -28,7 +28,7 @@ import java.sql.Timestamp
 
 class SetOperatorsTest extends TableTestBase {
 
-  @Ignore("Support in subQuery in RexNodeConverter")
+  @Ignore("Support in subQuery in ExpressionConverter")
   @Test
   def testInWithFilter(): Unit = {
     val util = batchTestUtil()

Reply via email to