This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5a0b317ad008e031bbfd16e75ebc2a7eb9ae25f
Author: JingsongLi <lzljs3620...@aliyun.com>
AuthorDate: Thu Aug 29 17:14:21 2019 +0800

    [FLINK-13775][table-planner-blink] Refactor ExpressionConverter
    
    This closes #9485
---
 .../converter/CallExpressionConvertRule.java       |  56 ++
 .../converter/CustomizedConvertRule.java           | 381 +++++++++
 .../expressions/converter/DirectConvertRule.java   | 157 ++++
 .../expressions/converter/ExpressionConverter.java | 907 +++------------------
 .../expressions/converter/OverConvertRule.java     | 197 +++++
 .../converter/ScalarFunctionConvertRule.java       |  54 ++
 tools/maven/suppressions.xml                       |   4 +-
 7 files changed, 980 insertions(+), 776 deletions(-)

diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/CallExpressionConvertRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/CallExpressionConvertRule.java
new file mode 100644
index 0000000..bf6a5c4
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/CallExpressionConvertRule.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.tools.RelBuilder;
+
+import java.util.Optional;
+
+/**
+ * Rule to convert {@link CallExpression}.
+ */
+public interface CallExpressionConvertRule {
+
+       /**
+        * Convert call expression with context to RexNode.
+        *
+        * @return Success return RexNode of {@link Optional#of}, Fail return 
{@link Optional#empty()}.
+        */
+       Optional<RexNode> convert(CallExpression call, ConvertContext context);
+
+       /**
+        * Context of {@link CallExpressionConvertRule}.
+        */
+       interface ConvertContext {
+
+               /**
+                * Convert expression to RexNode, used by children conversion.
+                */
+               RexNode toRexNode(Expression expr);
+
+               RelBuilder getRelBuilder();
+
+               FlinkTypeFactory getTypeFactory();
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/CustomizedConvertRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/CustomizedConvertRule.java
new file mode 100644
index 0000000..ca8193b
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/CustomizedConvertRule.java
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ExpressionUtils;
+import org.apache.flink.table.expressions.TableReferenceExpression;
+import org.apache.flink.table.expressions.TypeLiteralExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.expressions.utils.ApiExpressionUtils;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
+import org.apache.flink.table.planner.functions.InternalFunctionDefinitions;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexSubQuery;
+import org.apache.calcite.sql.fun.SqlTrimFunction;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;
+import static 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
+import static 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.extractValue;
+import static 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.toRexNodes;
+import static 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
+import static 
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isCharacterString;
+import static 
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isTemporal;
+import static 
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isTimeInterval;
+
+/**
+ * Customized {@link CallExpressionConvertRule}, Functions conversion here all 
require special logic,
+ * and there may be some special rules, such as needing get the literal values 
of inputs, such as
+ * converting to combinations of functions, to convert to RexNode of calcite.
+ */
+public class CustomizedConvertRule implements CallExpressionConvertRule {
+
+       private static final Map<FunctionDefinition, Conversion> 
DEFINITION_RULE_MAP = new HashMap<>();
+       static {
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.CAST, 
CustomizedConvertRule::convertCast);
+               
DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.REINTERPRET_CAST, 
CustomizedConvertRule::convertReinterpretCast);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.IN, 
CustomizedConvertRule::convertIn);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.GET, 
CustomizedConvertRule::convertGet);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.TRIM, 
CustomizedConvertRule::convertTrim);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.AS, 
CustomizedConvertRule::convertAs);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.BETWEEN, 
CustomizedConvertRule::convertBetween);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.NOT_BETWEEN, 
CustomizedConvertRule::convertNotBetween);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.REPLACE, 
CustomizedConvertRule::convertReplace);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.PLUS, 
CustomizedConvertRule::convertPlus);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.CEIL, 
CustomizedConvertRule::convertCeil);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.FLOOR, 
CustomizedConvertRule::convertFloor);
+               
DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS, 
CustomizedConvertRule::convertTemporalOverlaps);
+               
DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.TIMESTAMP_DIFF, 
CustomizedConvertRule::convertTimestampDiff);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.ARRAY, 
CustomizedConvertRule::convertArray);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.MAP, 
CustomizedConvertRule::convertMap);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.ROW, 
CustomizedConvertRule::convertRow);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.ORDER_ASC, 
CustomizedConvertRule::convertOrderAsc);
+               DEFINITION_RULE_MAP.put(BuiltInFunctionDefinitions.SQRT, 
CustomizedConvertRule::convertSqrt);
+
+               // blink expression
+               
DEFINITION_RULE_MAP.put(InternalFunctionDefinitions.THROW_EXCEPTION, 
CustomizedConvertRule::convertThrowException);
+       }
+
+       @Override
+       public Optional<RexNode> convert(CallExpression call, ConvertContext 
context) {
+               Conversion conversion = 
DEFINITION_RULE_MAP.get(call.getFunctionDefinition());
+               return Optional.ofNullable(conversion).map(c -> c.convert(call, 
context));
+       }
+
+       private static RexNode convertCast(CallExpression call, ConvertContext 
context) {
+               checkArgumentNumber(call, 2);
+               RexNode child = context.toRexNode(call.getChildren().get(0));
+               TypeLiteralExpression type = (TypeLiteralExpression) 
call.getChildren().get(1);
+               return context.getRelBuilder().getRexBuilder().makeAbstractCast(
+                       context.getTypeFactory().createFieldTypeFromLogicalType(
+                               
type.getOutputDataType().getLogicalType().copy(child.getType().isNullable())),
+                       child);
+       }
+
+       private static RexNode convertOrderAsc(CallExpression call, 
ConvertContext context) {
+               checkArgumentNumber(call, 1);
+               return context.toRexNode(call.getChildren().get(0));
+       }
+
+       private static RexNode convertTimestampDiff(CallExpression call, 
ConvertContext context) {
+               checkArgumentNumber(call, 3);
+               List<RexNode> childrenRexNode = toRexNodes(context, 
call.getChildren());
+               // different orders between flink table api and calcite.
+               return 
context.getRelBuilder().call(FlinkSqlOperatorTable.TIMESTAMP_DIFF, 
childrenRexNode.get(0), childrenRexNode.get(2),
+                       childrenRexNode.get(1));
+       }
+
+       private static RexNode convertNotBetween(CallExpression call, 
ConvertContext context) {
+               checkArgumentNumber(call, 3);
+               List<RexNode> childrenRexNode = toRexNodes(context, 
call.getChildren());
+               RexNode expr = childrenRexNode.get(0);
+               RexNode lowerBound = childrenRexNode.get(1);
+               RexNode upperBound = childrenRexNode.get(2);
+               return context.getRelBuilder().or(
+                       
context.getRelBuilder().call(FlinkSqlOperatorTable.LESS_THAN, expr, lowerBound),
+                       
context.getRelBuilder().call(FlinkSqlOperatorTable.GREATER_THAN, expr, 
upperBound));
+       }
+
+       private static RexNode convertBetween(CallExpression call, 
ConvertContext context) {
+               checkArgumentNumber(call, 3);
+               List<RexNode> childrenRexNode = toRexNodes(context, 
call.getChildren());
+               RexNode expr = childrenRexNode.get(0);
+               RexNode lowerBound = childrenRexNode.get(1);
+               RexNode upperBound = childrenRexNode.get(2);
+               return context.getRelBuilder().and(
+                       
context.getRelBuilder().call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, expr, 
lowerBound),
+                       
context.getRelBuilder().call(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, expr, 
upperBound));
+       }
+
+       private static RexNode convertCeil(CallExpression call, ConvertContext 
context) {
+               checkArgumentNumber(call, 1, 2);
+               List<Expression> children = call.getChildren();
+               List<RexNode> childrenRexNode = toRexNodes(context, children);
+               if (children.size() == 1) {
+                       return 
context.getRelBuilder().call(FlinkSqlOperatorTable.CEIL, childrenRexNode);
+               } else {
+                       return 
context.getRelBuilder().call(FlinkSqlOperatorTable.CEIL, 
childrenRexNode.get(1), childrenRexNode.get(0));
+               }
+       }
+
+       private static RexNode convertFloor(CallExpression call, ConvertContext 
context) {
+               checkArgumentNumber(call, 1, 2);
+               List<Expression> children = call.getChildren();
+               List<RexNode> childrenRexNode = toRexNodes(context, children);
+               if (children.size() == 1) {
+                       return 
context.getRelBuilder().call(FlinkSqlOperatorTable.FLOOR, childrenRexNode);
+               } else {
+                       return 
context.getRelBuilder().call(FlinkSqlOperatorTable.FLOOR, 
childrenRexNode.get(1), childrenRexNode.get(0));
+               }
+       }
+
+       private static RexNode convertArray(CallExpression call, ConvertContext 
context) {
+               // TODO get type from CallExpression directly until introduce 
type inference on Expression
+               List<RexNode> childrenRexNode = toRexNodes(context, 
call.getChildren());
+               ArrayType arrayType = new 
ArrayType(toLogicalType(childrenRexNode.get(0).getType()));
+               RelDataType relDataType = 
context.getTypeFactory().createFieldTypeFromLogicalType(arrayType);
+               return 
context.getRelBuilder().getRexBuilder().makeCall(relDataType, 
FlinkSqlOperatorTable.ARRAY_VALUE_CONSTRUCTOR, childrenRexNode);
+       }
+
+       private static RexNode convertMap(CallExpression call, ConvertContext 
context) {
+               // TODO get type from CallExpression directly until introduce 
type inference on Expression
+               List<Expression> children = call.getChildren();
+               checkArgument(call, !children.isEmpty() && children.size() % 2 
== 0);
+               List<RexNode> childrenRexNode = toRexNodes(context, children);
+               RelDataType keyType = childrenRexNode.get(0).getType();
+               RelDataType valueType = 
childrenRexNode.get(childrenRexNode.size() - 1).getType();
+               RelDataType mapType = 
context.getTypeFactory().createMapType(keyType, valueType);
+               return 
context.getRelBuilder().getRexBuilder().makeCall(mapType, 
FlinkSqlOperatorTable.MAP_VALUE_CONSTRUCTOR, childrenRexNode);
+       }
+
+       private static RexNode convertRow(CallExpression call, ConvertContext 
context) {
+               // TODO get type from CallExpression directly until introduce 
type inference on Expression
+               List<RexNode> childrenRexNode = toRexNodes(context, 
call.getChildren());
+               LogicalType[] childTypes = childrenRexNode.stream().map(rexNode 
-> toLogicalType(rexNode.getType()))
+                       .toArray(LogicalType[]::new);
+               RowType rowType = RowType.of(childTypes);
+               RelDataType relDataType = 
context.getTypeFactory().createFieldTypeFromLogicalType(rowType);
+               return 
context.getRelBuilder().getRexBuilder().makeCall(relDataType, 
FlinkSqlOperatorTable.ROW, childrenRexNode);
+       }
+
+       private static RexNode convertTemporalOverlaps(CallExpression call, 
ConvertContext context) {
+               checkArgumentNumber(call, 4);
+               List<RexNode> childrenRexNode = toRexNodes(context, 
call.getChildren());
+               // Standard conversion of the OVERLAPS operator.
+               // Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#convertOverlaps()]]
+               RexNode leftTimePoint = childrenRexNode.get(0);
+               RexNode leftTemporal = childrenRexNode.get(1);
+               RexNode rightTimePoint = childrenRexNode.get(2);
+               RexNode rightTemporal = childrenRexNode.get(3);
+               RexNode convLeftT;
+               if (isTimeInterval(toLogicalType(leftTemporal.getType()))) {
+                       convLeftT = 
context.getRelBuilder().call(FlinkSqlOperatorTable.DATETIME_PLUS, 
leftTimePoint, leftTemporal);
+               } else {
+                       convLeftT = leftTemporal;
+               }
+               // sort end points into start and end, such that (s0 <= e0) and 
(s1 <= e1).
+               RexNode leftLe = 
context.getRelBuilder().call(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, 
leftTimePoint, convLeftT);
+               RexNode s0 = 
context.getRelBuilder().call(FlinkSqlOperatorTable.CASE, leftLe, leftTimePoint, 
convLeftT);
+               RexNode e0 = 
context.getRelBuilder().call(FlinkSqlOperatorTable.CASE, leftLe, convLeftT, 
leftTimePoint);
+               RexNode convRightT;
+               if (isTimeInterval(toLogicalType(rightTemporal.getType()))) {
+                       convRightT = 
context.getRelBuilder().call(FlinkSqlOperatorTable.DATETIME_PLUS, 
rightTimePoint, rightTemporal);
+               } else {
+                       convRightT = rightTemporal;
+               }
+               RexNode rightLe = 
context.getRelBuilder().call(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, 
rightTimePoint, convRightT);
+               RexNode s1 = 
context.getRelBuilder().call(FlinkSqlOperatorTable.CASE, rightLe, 
rightTimePoint, convRightT);
+               RexNode e1 = 
context.getRelBuilder().call(FlinkSqlOperatorTable.CASE, rightLe, convRightT, 
rightTimePoint);
+
+               // (e0 >= s1) AND (e1 >= s0)
+               RexNode leftPred = 
context.getRelBuilder().call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, e0, 
s1);
+               RexNode rightPred = 
context.getRelBuilder().call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, e1, 
s0);
+               return context.getRelBuilder().call(FlinkSqlOperatorTable.AND, 
leftPred, rightPred);
+       }
+
+       private static RexNode convertPlus(CallExpression call, ConvertContext 
context) {
+               checkArgumentNumber(call, 2);
+               List<RexNode> childrenRexNode = toRexNodes(context, 
call.getChildren());
+               if 
(isCharacterString(toLogicalType(childrenRexNode.get(0).getType()))) {
+                       return context.getRelBuilder().call(
+                               FlinkSqlOperatorTable.CONCAT,
+                               childrenRexNode.get(0),
+                               
context.getRelBuilder().cast(childrenRexNode.get(1), VARCHAR));
+               } else if 
(isCharacterString(toLogicalType(childrenRexNode.get(1).getType()))) {
+                       return context.getRelBuilder().call(
+                               FlinkSqlOperatorTable.CONCAT,
+                               
context.getRelBuilder().cast(childrenRexNode.get(0), VARCHAR),
+                               childrenRexNode.get(1));
+               } else if 
(isTimeInterval(toLogicalType(childrenRexNode.get(0).getType())) &&
+                       childrenRexNode.get(0).getType() == 
childrenRexNode.get(1).getType()) {
+                       return 
context.getRelBuilder().call(FlinkSqlOperatorTable.PLUS, childrenRexNode);
+               } else if 
(isTimeInterval(toLogicalType(childrenRexNode.get(0).getType()))
+                       && 
isTemporal(toLogicalType(childrenRexNode.get(1).getType()))) {
+                       // Calcite has a bug that can't apply INTERVAL + 
DATETIME (INTERVAL at left)
+                       // we manually switch them here
+                       return 
context.getRelBuilder().call(FlinkSqlOperatorTable.DATETIME_PLUS, 
childrenRexNode.get(1), childrenRexNode.get(0));
+               } else if 
(isTemporal(toLogicalType(childrenRexNode.get(0).getType())) &&
+                       
isTemporal(toLogicalType(childrenRexNode.get(1).getType()))) {
+                       return 
context.getRelBuilder().call(FlinkSqlOperatorTable.DATETIME_PLUS, 
childrenRexNode);
+               } else {
+                       return 
context.getRelBuilder().call(FlinkSqlOperatorTable.PLUS, childrenRexNode);
+               }
+       }
+
+       private static RexNode convertReplace(CallExpression call, 
ConvertContext context) {
+               checkArgumentNumber(call, 2, 3);
+               List<Expression> children = call.getChildren();
+               List<RexNode> childrenRexNode = toRexNodes(context, children);
+               if (children.size() == 2) {
+                       return context.getRelBuilder().call(
+                               FlinkSqlOperatorTable.REPLACE,
+                               childrenRexNode.get(0),
+                               childrenRexNode.get(1),
+                               
context.getRelBuilder().call(FlinkSqlOperatorTable.CHAR_LENGTH, 
childrenRexNode.get(0)));
+               } else {
+                       return 
context.getRelBuilder().call(FlinkSqlOperatorTable.REPLACE, childrenRexNode);
+               }
+       }
+
+       private static RexNode convertAs(CallExpression call, ConvertContext 
context) {
+               checkArgumentNumber(call, 2);
+               String name = extractValue((ValueLiteralExpression) 
call.getChildren().get(1), String.class);
+               RexNode child = context.toRexNode(call.getChildren().get(0));
+               return context.getRelBuilder().alias(child, name);
+       }
+
+       private static RexNode convertTrim(CallExpression call, ConvertContext 
context) {
+               checkArgumentNumber(call, 4);
+               List<Expression> children = call.getChildren();
+               ValueLiteralExpression removeLeadingExpr = 
(ValueLiteralExpression) children.get(0);
+               Boolean removeLeading = extractValue(removeLeadingExpr, 
Boolean.class);
+               ValueLiteralExpression removeTrailingExpr = 
(ValueLiteralExpression) children.get(1);
+               Boolean removeTrailing = extractValue(removeTrailingExpr, 
Boolean.class);
+               RexNode trimString = context.toRexNode(children.get(2));
+               RexNode str = context.toRexNode(children.get(3));
+               Enum trimMode;
+               if (removeLeading && removeTrailing) {
+                       trimMode = SqlTrimFunction.Flag.BOTH;
+               } else if (removeLeading) {
+                       trimMode = SqlTrimFunction.Flag.LEADING;
+               } else if (removeTrailing) {
+                       trimMode = SqlTrimFunction.Flag.TRAILING;
+               } else {
+                       throw new IllegalArgumentException("Unsupported trim 
mode.");
+               }
+               return context.getRelBuilder().call(
+                       FlinkSqlOperatorTable.TRIM,
+                       
context.getRelBuilder().getRexBuilder().makeFlag(trimMode),
+                       trimString,
+                       str);
+       }
+
+       private static RexNode convertGet(CallExpression call, ConvertContext 
context) {
+               checkArgumentNumber(call, 2);
+               RexNode child = context.toRexNode(call.getChildren().get(0));
+               ValueLiteralExpression keyLiteral = (ValueLiteralExpression) 
call.getChildren().get(1);
+               Optional<Integer> indexOptional = ExpressionUtils
+                               .extractValue(keyLiteral, String.class)
+                               .map(child.getType().getFieldNames()::indexOf);
+               int index = indexOptional.orElseGet(() -> 
extractValue(keyLiteral, Integer.class));
+               return 
context.getRelBuilder().getRexBuilder().makeFieldAccess(child, index);
+       }
+
+       private static RexNode convertIn(CallExpression call, ConvertContext 
context) {
+               checkArgument(call, call.getChildren().size() > 1);
+               Expression headExpr = call.getChildren().get(1);
+               if (headExpr instanceof TableReferenceExpression) {
+                       QueryOperation tableOperation = 
((TableReferenceExpression) headExpr).getQueryOperation();
+                       RexNode child = 
context.toRexNode(call.getChildren().get(0));
+                       return RexSubQuery.in(((FlinkRelBuilder) 
context.getRelBuilder())
+                                       
.queryOperation(tableOperation).build(), ImmutableList.of(child));
+               } else {
+                       List<RexNode> child = toRexNodes(context, 
call.getChildren());
+                       return 
context.getRelBuilder().call(FlinkSqlOperatorTable.IN, child);
+               }
+       }
+
+       private static RexNode convertReinterpretCast(CallExpression call, 
ConvertContext context) {
+               checkArgumentNumber(call, 3);
+               RexNode child = context.toRexNode(call.getChildren().get(0));
+               TypeLiteralExpression type = (TypeLiteralExpression) 
call.getChildren().get(1);
+               RexNode checkOverflow = 
context.toRexNode(call.getChildren().get(2));
+               return 
context.getRelBuilder().getRexBuilder().makeReinterpretCast(
+                       context.getTypeFactory().createFieldTypeFromLogicalType(
+                               
type.getOutputDataType().getLogicalType().copy(child.getType().isNullable())),
+                       child,
+                       checkOverflow);
+       }
+
+       private static RexNode convertSqrt(CallExpression call, ConvertContext 
context) {
+               checkArgumentNumber(call, 1);
+               return 
context.getRelBuilder().call(FlinkSqlOperatorTable.POWER, toRexNodes(context,
+                       Arrays.asList(call.getChildren().get(0), 
ApiExpressionUtils.valueLiteral(0.5))));
+       }
+
+       private static RexNode convertThrowException(CallExpression call, 
ConvertContext context) {
+               checkArgumentNumber(call, 2);
+               DataType type = ((TypeLiteralExpression) 
call.getChildren().get(1)).getOutputDataType();
+               SqlThrowExceptionFunction function = new 
SqlThrowExceptionFunction(
+                       
context.getTypeFactory().createFieldTypeFromLogicalType(fromDataTypeToLogicalType(type)));
+               return context.getRelBuilder().call(function, 
context.toRexNode(call.getChildren().get(0)));
+       }
+
+       private static void checkArgumentNumber(CallExpression call, int... 
numbers) {
+               boolean find = false;
+               for (int number : numbers) {
+                       if (call.getChildren().size() == number) {
+                               find = true;
+                               break;
+                       }
+               }
+
+               checkArgument(call, find);
+       }
+
+       private static void checkArgument(CallExpression call, boolean check) {
+               if (!check) {
+                       throw new TableException("Invalid arguments for call: " 
+ call);
+               }
+       }
+
+       private interface Conversion {
+               RexNode convert(CallExpression call, ConvertContext context);
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
new file mode 100644
index 0000000..432b8c7
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlOperator;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.toRexNodes;
+
+/**
+ * A {@link CallExpressionConvertRule} that performs a simple one-to-one 
mapping between
+ * {@link FunctionDefinition} and a corresponding {@link SqlOperator}.
+ */
+public class DirectConvertRule implements CallExpressionConvertRule {
+
+       private static final Map<FunctionDefinition, SqlOperator> 
DEFINITION_OPERATOR_MAP = new HashMap<>();
+       static {
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.SHA1, 
FlinkSqlOperatorTable.SHA1);
+
+               // logic functions
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.AND, 
FlinkSqlOperatorTable.AND);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.OR, 
FlinkSqlOperatorTable.OR);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.NOT, 
FlinkSqlOperatorTable.NOT);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.IF, 
FlinkSqlOperatorTable.CASE);
+
+               // comparison functions
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.EQUALS, 
FlinkSqlOperatorTable.EQUALS);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.GREATER_THAN, 
FlinkSqlOperatorTable.GREATER_THAN);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LESS_THAN, 
FlinkSqlOperatorTable.LESS_THAN);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.NOT_EQUALS, 
FlinkSqlOperatorTable.NOT_EQUALS);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.IS_NULL, 
FlinkSqlOperatorTable.IS_NULL);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.IS_NOT_NULL, 
FlinkSqlOperatorTable.IS_NOT_NULL);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.IS_TRUE, 
FlinkSqlOperatorTable.IS_TRUE);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.IS_FALSE, 
FlinkSqlOperatorTable.IS_FALSE);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, 
FlinkSqlOperatorTable.IS_NOT_TRUE);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.IS_NOT_FALSE, 
FlinkSqlOperatorTable.IS_NOT_FALSE);
+
+               // string functions
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.CHAR_LENGTH, 
FlinkSqlOperatorTable.CHAR_LENGTH);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.INIT_CAP, 
FlinkSqlOperatorTable.INITCAP);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LIKE, 
FlinkSqlOperatorTable.LIKE);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LOWER, 
FlinkSqlOperatorTable.LOWER);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.SIMILAR, 
FlinkSqlOperatorTable.SIMILAR_TO);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.SUBSTRING, 
FlinkSqlOperatorTable.SUBSTRING);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.UPPER, 
FlinkSqlOperatorTable.UPPER);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.POSITION, 
FlinkSqlOperatorTable.POSITION);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.OVERLAY, 
FlinkSqlOperatorTable.OVERLAY);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.CONCAT, 
FlinkSqlOperatorTable.CONCAT_FUNCTION);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.CONCAT_WS, 
FlinkSqlOperatorTable.CONCAT_WS);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LPAD, 
FlinkSqlOperatorTable.LPAD);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.RPAD, 
FlinkSqlOperatorTable.RPAD);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, 
FlinkSqlOperatorTable.REGEXP_EXTRACT);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.FROM_BASE64, 
FlinkSqlOperatorTable.FROM_BASE64);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.TO_BASE64, 
FlinkSqlOperatorTable.TO_BASE64);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.UUID, 
FlinkSqlOperatorTable.UUID);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LTRIM, 
FlinkSqlOperatorTable.LTRIM);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.RTRIM, 
FlinkSqlOperatorTable.RTRIM);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.REPEAT, 
FlinkSqlOperatorTable.REPEAT);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.REGEXP_REPLACE, 
FlinkSqlOperatorTable.REGEXP_REPLACE);
+
+               // math functions
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.MINUS, 
FlinkSqlOperatorTable.MINUS);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.DIVIDE, 
FlinkSqlOperatorTable.DIVIDE);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.TIMES, 
FlinkSqlOperatorTable.MULTIPLY);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.ABS, 
FlinkSqlOperatorTable.ABS);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.EXP, 
FlinkSqlOperatorTable.EXP);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LOG10, 
FlinkSqlOperatorTable.LOG10);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LOG2, 
FlinkSqlOperatorTable.LOG2);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LN, 
FlinkSqlOperatorTable.LN);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LOG, 
FlinkSqlOperatorTable.LOG);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.POWER, 
FlinkSqlOperatorTable.POWER);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.MOD, 
FlinkSqlOperatorTable.MOD);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.MINUS_PREFIX, 
FlinkSqlOperatorTable.UNARY_MINUS);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.SIN, 
FlinkSqlOperatorTable.SIN);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.COS, 
FlinkSqlOperatorTable.COS);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.SINH, 
FlinkSqlOperatorTable.SINH);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.TAN, 
FlinkSqlOperatorTable.TAN);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.TANH, 
FlinkSqlOperatorTable.TANH);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.COT, 
FlinkSqlOperatorTable.COT);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.ASIN, 
FlinkSqlOperatorTable.ASIN);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.ACOS, 
FlinkSqlOperatorTable.ACOS);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.ATAN, 
FlinkSqlOperatorTable.ATAN);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.ATAN2, 
FlinkSqlOperatorTable.ATAN2);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.COSH, 
FlinkSqlOperatorTable.COSH);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.DEGREES, 
FlinkSqlOperatorTable.DEGREES);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.RADIANS, 
FlinkSqlOperatorTable.RADIANS);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.SIGN, 
FlinkSqlOperatorTable.SIGN);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.ROUND, 
FlinkSqlOperatorTable.ROUND);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.PI, 
FlinkSqlOperatorTable.PI);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.E, 
FlinkSqlOperatorTable.E);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.RAND, 
FlinkSqlOperatorTable.RAND);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.RAND_INTEGER, 
FlinkSqlOperatorTable.RAND_INTEGER);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.BIN, 
FlinkSqlOperatorTable.BIN);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.HEX, 
FlinkSqlOperatorTable.HEX);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.TRUNCATE, 
FlinkSqlOperatorTable.TRUNCATE);
+
+               // time functions
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.EXTRACT, 
FlinkSqlOperatorTable.EXTRACT);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.CURRENT_DATE, 
FlinkSqlOperatorTable.CURRENT_DATE);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.CURRENT_TIME, 
FlinkSqlOperatorTable.CURRENT_TIME);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.CURRENT_TIMESTAMP, 
FlinkSqlOperatorTable.CURRENT_TIMESTAMP);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LOCAL_TIME, 
FlinkSqlOperatorTable.LOCALTIME);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.LOCAL_TIMESTAMP, 
FlinkSqlOperatorTable.LOCALTIMESTAMP);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.DATE_FORMAT, 
FlinkSqlOperatorTable.DATE_FORMAT);
+
+               // collection
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.AT, 
FlinkSqlOperatorTable.ITEM);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.CARDINALITY, 
FlinkSqlOperatorTable.CARDINALITY);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.ORDER_DESC, 
FlinkSqlOperatorTable.DESC);
+               
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.ARRAY_ELEMENT, 
FlinkSqlOperatorTable.ELEMENT);
+
+               // crypto hash
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.MD5, 
FlinkSqlOperatorTable.MD5);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.SHA2, 
FlinkSqlOperatorTable.SHA2);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.SHA224, 
FlinkSqlOperatorTable.SHA224);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.SHA256, 
FlinkSqlOperatorTable.SHA256);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.SHA384, 
FlinkSqlOperatorTable.SHA384);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.SHA512, 
FlinkSqlOperatorTable.SHA512);
+               DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.SHA1, 
FlinkSqlOperatorTable.SHA1);
+               DEFINITION_OPERATOR_MAP.put(
+                       BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP, 
FlinkSqlOperatorTable.STREAMRECORD_TIMESTAMP);
+       }
+
+       @Override
+       public Optional<RexNode> convert(CallExpression call, ConvertContext 
context) {
+               SqlOperator operator = 
DEFINITION_OPERATOR_MAP.get(call.getFunctionDefinition());
+               return Optional.ofNullable(operator)
+                       .map(op -> context.getRelBuilder().call(op, 
toRexNodes(context, call.getChildren())));
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
index b1e8e96..cdd236a 100644
--- 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ExpressionConverter.java
@@ -22,71 +22,30 @@ import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.dataformat.Decimal;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
-import org.apache.flink.table.expressions.ExpressionUtils;
 import org.apache.flink.table.expressions.ExpressionVisitor;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
 import org.apache.flink.table.expressions.LocalReferenceExpression;
-import org.apache.flink.table.expressions.ResolvedExpression;
-import org.apache.flink.table.expressions.TableReferenceExpression;
 import org.apache.flink.table.expressions.TimeIntervalUnit;
 import org.apache.flink.table.expressions.TimePointUnit;
 import org.apache.flink.table.expressions.TypeLiteralExpression;
 import org.apache.flink.table.expressions.ValueLiteralExpression;
-import org.apache.flink.table.expressions.utils.ApiExpressionUtils;
-import org.apache.flink.table.functions.AggregateFunction;
-import org.apache.flink.table.functions.AggregateFunctionDefinition;
-import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
-import org.apache.flink.table.functions.FunctionDefinition;
-import org.apache.flink.table.functions.ScalarFunction;
-import org.apache.flink.table.functions.ScalarFunctionDefinition;
-import org.apache.flink.table.functions.TableFunctionDefinition;
-import org.apache.flink.table.functions.UserDefinedAggregateFunction;
-import org.apache.flink.table.operations.QueryOperation;
 import org.apache.flink.table.planner.calcite.FlinkContext;
-import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
 import org.apache.flink.table.planner.calcite.RexFieldVariable;
 import org.apache.flink.table.planner.expressions.RexNodeExpression;
-import org.apache.flink.table.planner.expressions.SqlAggFunctionVisitor;
-import org.apache.flink.table.planner.functions.InternalFunctionDefinitions;
-import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
-import org.apache.flink.table.planner.functions.sql.SqlThrowExceptionFunction;
-import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.ArrayType;
+import 
org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule.ConvertContext;
 import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
-import org.apache.flink.util.Preconditions;
 
-import com.google.common.collect.ImmutableList;
 import org.apache.calcite.avatica.util.DateTimeUtils;
 import org.apache.calcite.avatica.util.TimeUnit;
 import org.apache.calcite.avatica.util.TimeUnitRange;
-import org.apache.calcite.rel.RelFieldCollation;
 import org.apache.calcite.rel.type.RelDataType;
 import org.apache.calcite.rex.RexBuilder;
-import org.apache.calcite.rex.RexCall;
-import org.apache.calcite.rex.RexFieldCollation;
 import org.apache.calcite.rex.RexNode;
-import org.apache.calcite.rex.RexSubQuery;
-import org.apache.calcite.rex.RexWindowBound;
-import org.apache.calcite.sql.SqlAggFunction;
-import org.apache.calcite.sql.SqlBasicCall;
-import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlIntervalQualifier;
-import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlLiteral;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperator;
-import org.apache.calcite.sql.SqlPostfixOperator;
-import org.apache.calcite.sql.SqlWindow;
-import org.apache.calcite.sql.fun.SqlTrimFunction;
 import org.apache.calcite.sql.parser.SqlParserPos;
-import org.apache.calcite.sql.type.OrdinalReturnTypeInference;
 import org.apache.calcite.sql.type.SqlTypeName;
 import org.apache.calcite.tools.RelBuilder;
 import org.apache.calcite.util.DateString;
@@ -95,572 +54,45 @@ import org.apache.calcite.util.TimestampString;
 import org.apache.calcite.util.TimestampWithTimeZoneString;
 
 import java.math.BigDecimal;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Date;
-import java.util.HashSet;
-import java.util.IdentityHashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.TimeZone;
 import java.util.stream.Collectors;
 
-import static org.apache.calcite.sql.type.SqlTypeName.VARCHAR;
-import static 
org.apache.flink.table.planner.calcite.FlinkTypeFactory.toLogicalType;
 import static 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
-import static 
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isCharacterString;
-import static 
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isTemporal;
-import static 
org.apache.flink.table.runtime.typeutils.TypeCheckUtils.isTimeInterval;
-import static 
org.apache.flink.table.types.utils.TypeConversions.fromLegacyInfoToDataType;
 
 /**
  * Visit expression to generator {@link RexNode}.
  */
 public class ExpressionConverter implements ExpressionVisitor<RexNode> {
 
+       private static final List<CallExpressionConvertRule> 
FUNCTION_CONVERT_CHAIN = Arrays.asList(
+               new ScalarFunctionConvertRule(),
+               new OverConvertRule(),
+               new DirectConvertRule(),
+               new CustomizedConvertRule()
+       );
+
        private final RelBuilder relBuilder;
        private final FlinkTypeFactory typeFactory;
 
-       // store mapping from BuiltInFunctionDefinition to it's 
RexNodeConversion.
-       private final Map<FunctionDefinition, RexNodeConversion> 
conversionsOfBuiltInFunc = new IdentityHashMap<>();
-
        public ExpressionConverter(RelBuilder relBuilder) {
                this.relBuilder = relBuilder;
                this.typeFactory = (FlinkTypeFactory) 
relBuilder.getRexBuilder().getTypeFactory();
-
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.CAST, 
exprs -> convertCast(exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.REINTERPRET_CAST, exprs 
-> convertReinterpretCast(exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.IN, 
exprs -> convertIn(exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.GET, 
exprs -> convertGet(exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.TRIM, 
exprs -> convertTrim(exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.AS, 
exprs -> convertAs(exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.OVER, 
exprs -> convertOver(exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.IS_NULL, exprs -> 
convertIsNull(exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.BETWEEN, exprs -> 
convertBetween(exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.NOT_BETWEEN, exprs -> 
convertNotBetween(exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.REPLACE, exprs -> 
convertReplace(exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.PLUS, 
exprs -> convertPlus(exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.CEIL, 
exprs -> convertCeil(exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.FLOOR, 
exprs -> convertFloor(exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.TEMPORAL_OVERLAPS, 
exprs -> convertTemporalOverlaps(exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.TIMESTAMP_DIFF, exprs 
-> convertTimestampDiff(exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.ARRAY, 
exprs -> convertArray(exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.ARRAY_ELEMENT, exprs -> 
convertArrayElement(exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.MAP, 
exprs -> convertMap(exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.ROW, 
exprs -> convertRow(exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.ORDER_ASC, exprs -> 
convertOrderAsc(exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SHA1, 
exprs -> convert(FlinkSqlOperatorTable.SHA1, exprs));
-               // logic functions
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.AND, 
exprs -> convert(FlinkSqlOperatorTable.AND, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.OR, 
exprs -> convert(FlinkSqlOperatorTable.OR, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.NOT, 
exprs -> convert(FlinkSqlOperatorTable.NOT, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.IF, 
exprs -> convert(FlinkSqlOperatorTable.CASE, exprs));
-
-               // comparison functions
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.EQUALS, exprs 
-> convert(FlinkSqlOperatorTable.EQUALS, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.GREATER_THAN, 
exprs -> convert(FlinkSqlOperatorTable.GREATER_THAN, exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, 
exprs -> convert(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.LESS_THAN, 
exprs -> convert(FlinkSqlOperatorTable.LESS_THAN, exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, 
exprs -> convert(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.NOT_EQUALS, 
exprs -> convert(FlinkSqlOperatorTable.NOT_EQUALS, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.IS_NULL, exprs 
-> convert(FlinkSqlOperatorTable.IS_NULL, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.IS_NOT_NULL, 
exprs -> convert(FlinkSqlOperatorTable.IS_NOT_NULL, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.IS_TRUE, exprs 
-> convert(FlinkSqlOperatorTable.IS_TRUE, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.IS_FALSE, exprs 
-> convert(FlinkSqlOperatorTable.IS_FALSE, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.IS_NOT_TRUE, 
exprs -> convert(FlinkSqlOperatorTable.IS_NOT_TRUE, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, 
exprs -> convert(FlinkSqlOperatorTable.IS_NOT_FALSE, exprs));
-
-               // string functions
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.CHAR_LENGTH, 
exprs -> convert(FlinkSqlOperatorTable.CHAR_LENGTH, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.INIT_CAP, exprs 
-> convert(FlinkSqlOperatorTable.INITCAP, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.LIKE, 
exprs -> convert(FlinkSqlOperatorTable.LIKE, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.LOWER, exprs -> 
convert(FlinkSqlOperatorTable.LOWER, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.SIMILAR, exprs 
-> convert(FlinkSqlOperatorTable.SIMILAR_TO, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.SUBSTRING, 
exprs -> convert(FlinkSqlOperatorTable.SUBSTRING, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.UPPER, exprs -> 
convert(FlinkSqlOperatorTable.UPPER, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.POSITION, exprs 
-> convert(FlinkSqlOperatorTable.POSITION, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.OVERLAY, exprs 
-> convert(FlinkSqlOperatorTable.OVERLAY, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.CONCAT, exprs 
-> convert(FlinkSqlOperatorTable.CONCAT_FUNCTION, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.CONCAT_WS, 
exprs -> convert(FlinkSqlOperatorTable.CONCAT_WS, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.LPAD, 
exprs -> convert(FlinkSqlOperatorTable.LPAD, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.RPAD, 
exprs -> convert(FlinkSqlOperatorTable.RPAD, exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, exprs 
-> convert(FlinkSqlOperatorTable.REGEXP_EXTRACT, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.FROM_BASE64, 
exprs -> convert(FlinkSqlOperatorTable.FROM_BASE64, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.TO_BASE64, 
exprs -> convert(FlinkSqlOperatorTable.TO_BASE64, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.UUID, 
exprs -> convert(FlinkSqlOperatorTable.UUID, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.LTRIM, exprs -> 
convert(FlinkSqlOperatorTable.LTRIM, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.RTRIM, exprs -> 
convert(FlinkSqlOperatorTable.RTRIM, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.REPEAT, exprs 
-> convert(FlinkSqlOperatorTable.REPEAT, exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.REGEXP_REPLACE, exprs 
-> convert(FlinkSqlOperatorTable.REGEXP_REPLACE, exprs));
-
-               // math functions
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.MINUS, exprs -> 
convert(FlinkSqlOperatorTable.MINUS, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.DIVIDE, exprs 
-> convert(FlinkSqlOperatorTable.DIVIDE, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.TIMES, exprs -> 
convert(FlinkSqlOperatorTable.MULTIPLY, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.ABS, 
exprs -> convert(FlinkSqlOperatorTable.ABS, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.EXP, 
exprs -> convert(FlinkSqlOperatorTable.EXP, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.LOG10, exprs -> 
convert(FlinkSqlOperatorTable.LOG10, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.LOG2, 
exprs -> convert(FlinkSqlOperatorTable.LOG2, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.LN, 
exprs -> convert(FlinkSqlOperatorTable.LN, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.LOG, 
exprs -> convert(FlinkSqlOperatorTable.LOG, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.POWER, exprs -> 
convert(FlinkSqlOperatorTable.POWER, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.MOD, 
exprs -> convert(FlinkSqlOperatorTable.MOD, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SQRT, 
exprs ->
-                               relBuilder.call(FlinkSqlOperatorTable.POWER,
-                                               
convertCallChildren(Arrays.asList(exprs.get(0), 
ApiExpressionUtils.valueLiteral(0.5)))));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.MINUS_PREFIX, 
exprs -> convert(FlinkSqlOperatorTable.UNARY_MINUS, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SIN, 
exprs -> convert(FlinkSqlOperatorTable.SIN, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.COS, 
exprs -> convert(FlinkSqlOperatorTable.COS, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SINH, 
exprs -> convert(FlinkSqlOperatorTable.SINH, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.TAN, 
exprs -> convert(FlinkSqlOperatorTable.TAN, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.TANH, 
exprs -> convert(FlinkSqlOperatorTable.TANH, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.COT, 
exprs -> convert(FlinkSqlOperatorTable.COT, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.ASIN, 
exprs -> convert(FlinkSqlOperatorTable.ASIN, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.ACOS, 
exprs -> convert(FlinkSqlOperatorTable.ACOS, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.ATAN, 
exprs -> convert(FlinkSqlOperatorTable.ATAN, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.ATAN2, exprs -> 
convert(FlinkSqlOperatorTable.ATAN2, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.COSH, 
exprs -> convert(FlinkSqlOperatorTable.COSH, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.DEGREES, exprs 
-> convert(FlinkSqlOperatorTable.DEGREES, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.RADIANS, exprs 
-> convert(FlinkSqlOperatorTable.RADIANS, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SIGN, 
exprs -> convert(FlinkSqlOperatorTable.SIGN, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.ROUND, exprs -> 
convert(FlinkSqlOperatorTable.ROUND, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.PI, 
exprs -> convert(FlinkSqlOperatorTable.PI, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.E, 
exprs -> convert(FlinkSqlOperatorTable.E, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.RAND, 
exprs -> convert(FlinkSqlOperatorTable.RAND, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.RAND_INTEGER, 
exprs -> convert(FlinkSqlOperatorTable.RAND_INTEGER, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.BIN, 
exprs -> convert(FlinkSqlOperatorTable.BIN, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.HEX, 
exprs -> convert(FlinkSqlOperatorTable.HEX, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.TRUNCATE, exprs 
-> convert(FlinkSqlOperatorTable.TRUNCATE, exprs));
-
-               // time functions
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.EXTRACT, exprs 
-> convert(FlinkSqlOperatorTable.EXTRACT, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.CURRENT_DATE, 
exprs -> convert(FlinkSqlOperatorTable.CURRENT_DATE, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.CURRENT_TIME, 
exprs -> convert(FlinkSqlOperatorTable.CURRENT_TIME, exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.CURRENT_TIMESTAMP, 
exprs -> convert(FlinkSqlOperatorTable.CURRENT_TIMESTAMP, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.LOCAL_TIME, 
exprs -> convert(FlinkSqlOperatorTable.LOCALTIME, exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.LOCAL_TIMESTAMP, exprs 
-> convert(FlinkSqlOperatorTable.LOCALTIMESTAMP, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.DATE_FORMAT, 
exprs -> convert(FlinkSqlOperatorTable.DATE_FORMAT, exprs));
-
-               // collection
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.AT, 
exprs -> convert(FlinkSqlOperatorTable.ITEM, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.CARDINALITY, 
exprs -> convert(FlinkSqlOperatorTable.CARDINALITY, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.ORDER_DESC, 
exprs -> convert(FlinkSqlOperatorTable.DESC, exprs));
-
-               // crypto hash
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.MD5, 
exprs -> convert(FlinkSqlOperatorTable.MD5, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SHA2, 
exprs -> convert(FlinkSqlOperatorTable.SHA2, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.SHA224, exprs 
-> convert(FlinkSqlOperatorTable.SHA224, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.SHA256, exprs 
-> convert(FlinkSqlOperatorTable.SHA256, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.SHA384, exprs 
-> convert(FlinkSqlOperatorTable.SHA384, exprs));
-               conversionsOfBuiltInFunc
-                               .put(BuiltInFunctionDefinitions.SHA512, exprs 
-> convert(FlinkSqlOperatorTable.SHA512, exprs));
-               conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.SHA1, 
exprs -> convert(FlinkSqlOperatorTable.SHA1, exprs));
-               
conversionsOfBuiltInFunc.put(BuiltInFunctionDefinitions.STREAM_RECORD_TIMESTAMP,
-                               exprs -> 
convert(FlinkSqlOperatorTable.STREAMRECORD_TIMESTAMP, exprs));
-
-               // blink expression
-               
conversionsOfBuiltInFunc.put(InternalFunctionDefinitions.THROW_EXCEPTION, exprs 
-> {
-                       DataType type = ((TypeLiteralExpression) 
exprs.get(1)).getOutputDataType();
-                       return convert(new SqlThrowExceptionFunction(
-                                       
typeFactory.createFieldTypeFromLogicalType(fromDataTypeToLogicalType(type))),
-                                       exprs.subList(0, 1));
-               });
        }
 
        @Override
        public RexNode visit(CallExpression call) {
-               FunctionDefinition func = call.getFunctionDefinition();
-               if (func instanceof ScalarFunctionDefinition) {
-                       ScalarFunction scalaFunc = ((ScalarFunctionDefinition) 
func).getScalarFunction();
-                       List<RexNode> child = 
convertCallChildren(call.getChildren());
-                       SqlFunction sqlFunction = 
UserDefinedFunctionUtils.createScalarSqlFunction(
-                                       scalaFunc.functionIdentifier(),
-                                       scalaFunc.toString(),
-                                       scalaFunc,
-                                       typeFactory);
-                       return relBuilder.call(sqlFunction, child);
-               } else if (func instanceof TableFunctionDefinition) {
-                       throw new RuntimeException("There is no possible reach 
here!");
-               } else if (func instanceof AggregateFunctionDefinition) {
-                       UserDefinedAggregateFunction aggFunc = 
((AggregateFunctionDefinition) func).getAggregateFunction();
-                       if (aggFunc instanceof AggregateFunction) {
-                               SqlFunction aggSqlFunction = 
UserDefinedFunctionUtils.createAggregateSqlFunction(
-                                               aggFunc.functionIdentifier(),
-                                               aggFunc.toString(),
-                                               (AggregateFunction) aggFunc,
-                                               
fromLegacyInfoToDataType(aggFunc.getResultType()),
-                                               
fromLegacyInfoToDataType(aggFunc.getAccumulatorType()),
-                                               typeFactory);
-                               List<RexNode> child = 
convertCallChildren(call.getChildren());
-                               return relBuilder.call(aggSqlFunction, child);
-                       } else {
-                               throw new 
UnsupportedOperationException("TableAggregateFunction is not supported yet!");
+               for (CallExpressionConvertRule rule : FUNCTION_CONVERT_CHAIN) {
+                       Optional<RexNode> converted = rule.convert(call, 
newFunctionContext());
+                       if (converted.isPresent()) {
+                               return converted.get();
                        }
-
-               } else {
-                       FunctionDefinition def = call.getFunctionDefinition();
-                       if (conversionsOfBuiltInFunc.containsKey(def)) {
-                               RexNodeConversion conversion = 
conversionsOfBuiltInFunc.get(def);
-                               return conversion.convert(call.getChildren());
-                       } else {
-                               throw new 
UnsupportedOperationException(def.toString());
-                       }
-               }
-       }
-
-       private List<RexNode> convertCallChildren(List<Expression> children) {
-               return children.stream()
-                               .map(expression -> 
expression.accept(ExpressionConverter.this))
-                               .collect(Collectors.toList());
-       }
-
-       private RexNode convert(SqlOperator sqlOperator, List<Expression> 
children) {
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               return relBuilder.call(sqlOperator, childrenRexNode);
-       }
-
-       private RexNode convertArrayElement(List<Expression> children) {
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               return relBuilder.call(FlinkSqlOperatorTable.ELEMENT, 
childrenRexNode);
-       }
-
-       private RexNode convertOrderAsc(List<Expression> children) {
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               return childrenRexNode.get(0);
-       }
-
-       private RexNode convertTimestampDiff(List<Expression> children) {
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               return relBuilder.call(FlinkSqlOperatorTable.TIMESTAMP_DIFF, 
childrenRexNode.get(0), childrenRexNode.get(2),
-                               childrenRexNode.get(1));
-       }
-
-       private RexNode convertIsNull(List<Expression> children) {
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               return relBuilder.isNull(childrenRexNode.get(0));
-       }
-
-       private RexNode convertNotBetween(List<Expression> children) {
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               Preconditions.checkArgument(childrenRexNode.size() == 3);
-               RexNode expr = childrenRexNode.get(0);
-               RexNode lowerBound = childrenRexNode.get(1);
-               RexNode upperBound = childrenRexNode.get(2);
-               return relBuilder.or(
-                               
relBuilder.call(FlinkSqlOperatorTable.LESS_THAN, expr, lowerBound),
-                               
relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN, expr, upperBound));
-       }
-
-       private RexNode convertBetween(List<Expression> children) {
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               Preconditions.checkArgument(childrenRexNode.size() == 3);
-               RexNode expr = childrenRexNode.get(0);
-               RexNode lowerBound = childrenRexNode.get(1);
-               RexNode upperBound = childrenRexNode.get(2);
-               return relBuilder.and(
-                               
relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, expr, lowerBound),
-                               
relBuilder.call(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, expr, upperBound));
-       }
-
-       private RexNode convertCeil(List<Expression> children) {
-               Preconditions.checkArgument(children.size() == 1 || 
children.size() == 2);
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               if (children.size() == 1) {
-                       return relBuilder.call(FlinkSqlOperatorTable.CEIL, 
childrenRexNode);
-               } else {
-                       return relBuilder.call(FlinkSqlOperatorTable.CEIL, 
childrenRexNode.get(1), childrenRexNode.get(0));
-               }
-       }
-
-       private RexNode convertFloor(List<Expression> children) {
-               Preconditions.checkArgument(children.size() == 1 || 
children.size() == 2);
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               if (children.size() == 1) {
-                       return relBuilder.call(FlinkSqlOperatorTable.FLOOR, 
childrenRexNode);
-               } else {
-                       return relBuilder.call(FlinkSqlOperatorTable.FLOOR, 
childrenRexNode.get(1), childrenRexNode.get(0));
-               }
-       }
-
-       private RexNode convertArray(List<Expression> children) {
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               ArrayType arrayType = new 
ArrayType(toLogicalType(childrenRexNode.get(0).getType()));
-               // TODO get type from CallExpression directly
-               RelDataType relDataType = 
typeFactory.createFieldTypeFromLogicalType(arrayType);
-               return relBuilder.getRexBuilder().makeCall(relDataType, 
FlinkSqlOperatorTable.ARRAY_VALUE_CONSTRUCTOR, childrenRexNode);
-       }
-
-       private RexNode convertMap(List<Expression> children) {
-               Preconditions.checkArgument(!children.isEmpty() && 
children.size() % 2 == 0);
-               // TODO get type from CallExpression directly
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               RelDataType keyType = childrenRexNode.get(0).getType();
-               RelDataType valueType = 
childrenRexNode.get(childrenRexNode.size() - 1).getType();
-               RelDataType mapType = typeFactory.createMapType(keyType, 
valueType);
-               return relBuilder.getRexBuilder().makeCall(mapType, 
FlinkSqlOperatorTable.MAP_VALUE_CONSTRUCTOR, childrenRexNode);
-       }
-
-       private RexNode convertRow(List<Expression> children) {
-               // TODO get type from CallExpression directly
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               LogicalType[] childTypes = childrenRexNode.stream().map(rexNode 
-> toLogicalType(rexNode.getType()))
-                               .toArray(LogicalType[]::new);
-               RowType rowType = RowType.of(childTypes);
-               RelDataType relDataType = 
typeFactory.createFieldTypeFromLogicalType(rowType);
-               return relBuilder.getRexBuilder().makeCall(relDataType, 
FlinkSqlOperatorTable.ROW, childrenRexNode);
-       }
-
-       private RexNode convertTemporalOverlaps(List<Expression> children) {
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               // Standard conversion of the OVERLAPS operator.
-               // Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#convertOverlaps()]]
-               RexNode leftTimePoint = childrenRexNode.get(0);
-               RexNode leftTemporal = childrenRexNode.get(1);
-               RexNode rightTimePoint = childrenRexNode.get(2);
-               RexNode rightTemporal = childrenRexNode.get(3);
-               RexNode convLeftT;
-               if (isTimeInterval(toLogicalType(leftTemporal.getType()))) {
-                       convLeftT = 
relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, leftTimePoint, 
leftTemporal);
-               } else {
-                       convLeftT = leftTemporal;
                }
-               // sort end points into start and end, such that (s0 <= e0) and 
(s1 <= e1).
-               RexNode leftLe = 
relBuilder.call(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, leftTimePoint, 
convLeftT);
-               RexNode s0 = relBuilder.call(FlinkSqlOperatorTable.CASE, 
leftLe, leftTimePoint, convLeftT);
-               RexNode e0 = relBuilder.call(FlinkSqlOperatorTable.CASE, 
leftLe, convLeftT, leftTimePoint);
-               RexNode convRightT;
-               if (isTimeInterval(toLogicalType(rightTemporal.getType()))) {
-                       convRightT = 
relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, rightTimePoint, 
rightTemporal);
-               } else {
-                       convRightT = rightTemporal;
-               }
-               RexNode rightLe = 
relBuilder.call(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, rightTimePoint, 
convRightT);
-               RexNode s1 = relBuilder.call(FlinkSqlOperatorTable.CASE, 
rightLe, rightTimePoint, convRightT);
-               RexNode e1 = relBuilder.call(FlinkSqlOperatorTable.CASE, 
rightLe, convRightT, rightTimePoint);
-
-               // (e0 >= s1) AND (e1 >= s0)
-               RexNode leftPred = 
relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, e0, s1);
-               RexNode rightPred = 
relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, e1, s0);
-               return relBuilder.call(FlinkSqlOperatorTable.AND, leftPred, 
rightPred);
-       }
-
-       private RexNode convertPlus(List<Expression> children) {
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               if 
(isCharacterString(toLogicalType(childrenRexNode.get(0).getType()))) {
-                       return relBuilder.call(
-                                       FlinkSqlOperatorTable.CONCAT,
-                                       childrenRexNode.get(0),
-                                       relBuilder.cast(childrenRexNode.get(1), 
VARCHAR));
-               } else if 
(isCharacterString(toLogicalType(childrenRexNode.get(1).getType()))) {
-                       return relBuilder.call(
-                                       FlinkSqlOperatorTable.CONCAT,
-                                       relBuilder.cast(childrenRexNode.get(0), 
VARCHAR),
-                                       childrenRexNode.get(1));
-               } else if 
(isTimeInterval(toLogicalType(childrenRexNode.get(0).getType())) &&
-                               childrenRexNode.get(0).getType() == 
childrenRexNode.get(1).getType()) {
-                       return relBuilder.call(FlinkSqlOperatorTable.PLUS, 
childrenRexNode);
-               } else if 
(isTimeInterval(toLogicalType(childrenRexNode.get(0).getType()))
-                               && 
isTemporal(toLogicalType(childrenRexNode.get(1).getType()))) {
-                       // Calcite has a bug that can't apply INTERVAL + 
DATETIME (INTERVAL at left)
-                       // we manually switch them here
-                       return 
relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, childrenRexNode.get(1), 
childrenRexNode.get(0));
-               } else if 
(isTemporal(toLogicalType(childrenRexNode.get(0).getType())) &&
-                               
isTemporal(toLogicalType(childrenRexNode.get(1).getType()))) {
-                       return 
relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, childrenRexNode);
-               } else {
-                       return relBuilder.call(FlinkSqlOperatorTable.PLUS, 
childrenRexNode);
-               }
-       }
-
-       private RexNode convertReplace(List<Expression> children) {
-               Preconditions.checkArgument(children.size() == 2 || 
children.size() == 3);
-               List<RexNode> childrenRexNode = convertCallChildren(children);
-               if (children.size() == 2) {
-                       return relBuilder.call(
-                                       FlinkSqlOperatorTable.REPLACE,
-                                       childrenRexNode.get(0),
-                                       childrenRexNode.get(1),
-                                       
relBuilder.call(FlinkSqlOperatorTable.CHAR_LENGTH, childrenRexNode.get(0)));
-               } else {
-                       return relBuilder.call(FlinkSqlOperatorTable.REPLACE, 
childrenRexNode);
-               }
-       }
-
-       private RexNode convertOver(List<Expression> children) {
-               List<Expression> args = children;
-               Expression agg = args.get(0);
-               SqlAggFunction aggFunc = agg.accept(new 
SqlAggFunctionVisitor(typeFactory));
-               RelDataType aggResultType = 
typeFactory.createFieldTypeFromLogicalType(
-                               fromDataTypeToLogicalType(((ResolvedExpression) 
agg).getOutputDataType()));
-
-               // assemble exprs by agg children
-               List<RexNode> aggExprs = agg.getChildren().stream().map(expr -> 
expr.accept(this))
-                               .collect(Collectors.toList());
-
-               // assemble order by key
-               Expression orderKeyExpr = args.get(1);
-               Set<SqlKind> kinds = new HashSet<>();
-               RexNode collationRexNode = 
createCollation(orderKeyExpr.accept(this), 
RelFieldCollation.Direction.ASCENDING,
-                               null, kinds);
-               ImmutableList<RexFieldCollation> orderKey = ImmutableList
-                               .of(new RexFieldCollation(collationRexNode, 
kinds));
-
-               // assemble partition by keys
-               List<RexNode> partitionKeys = args.subList(4, 
args.size()).stream().map(expr -> expr.accept(this))
-                               .collect(Collectors.toList());
-               // assemble bounds
-               Expression preceding = args.get(2);
-               boolean isPhysical = LogicalTypeChecks.hasRoot(
-                               fromDataTypeToLogicalType(((ResolvedExpression) 
preceding).getOutputDataType()),
-                               LogicalTypeRoot.BIGINT);
-               Expression following = args.get(3);
-               RexWindowBound lowerBound = createBound(preceding, 
SqlKind.PRECEDING);
-               RexWindowBound upperBound = createBound(following, 
SqlKind.FOLLOWING);
-
-               // build RexOver
-               return relBuilder.getRexBuilder().makeOver(
-                               aggResultType,
-                               aggFunc,
-                               aggExprs,
-                               partitionKeys,
-                               orderKey,
-                               lowerBound,
-                               upperBound,
-                               isPhysical,
-                               true,
-                               false,
-                               false);
-       }
-
-       private RexNode convertAs(List<Expression> children) {
-               String name = extractValue((ValueLiteralExpression) 
children.get(1), String.class);
-               RexNode child = children.get(0).accept(this);
-               return relBuilder.alias(child, name);
-       }
-
-       private RexNode convertTrim(List<Expression> children) {
-               ValueLiteralExpression removeLeadingExpr = 
(ValueLiteralExpression) children.get(0);
-               Boolean removeLeading = extractValue(removeLeadingExpr, 
Boolean.class);
-               ValueLiteralExpression removeTrailingExpr = 
(ValueLiteralExpression) children.get(1);
-               Boolean removeTrailing = extractValue(removeTrailingExpr, 
Boolean.class);
-               RexNode trimString = children.get(2).accept(this);
-               RexNode str = children.get(3).accept(this);
-               Enum trimMode;
-               if (removeLeading && removeTrailing) {
-                       trimMode = SqlTrimFunction.Flag.BOTH;
-               } else if (removeLeading) {
-                       trimMode = SqlTrimFunction.Flag.LEADING;
-               } else if (removeTrailing) {
-                       trimMode = SqlTrimFunction.Flag.TRAILING;
-               } else {
-                       throw new IllegalArgumentException("Unsupported trim 
mode.");
-               }
-               return relBuilder.call(
-                               FlinkSqlOperatorTable.TRIM,
-                               relBuilder.getRexBuilder().makeFlag(trimMode),
-                               trimString,
-                               str);
-       }
-
-       private RexNode convertGet(List<Expression> children) {
-               RexNode child = children.get(0).accept(this);
-               ValueLiteralExpression keyLiteral = (ValueLiteralExpression) 
children.get(1);
-               Optional<Integer> indexOptional = 
ExpressionUtils.extractValue(keyLiteral, String.class).map(
-                               child.getType().getFieldNames()::indexOf);
-               // Note: never replace the following code with :
-               // int index = indexOptional.orElseGet(() -> 
extractValue(keyLiteral, Integer.class));
-               // Because the logical in `orElseGet` always executed no matter 
whether indexOptional is present or not.
-               int index;
-               if (indexOptional.isPresent()) {
-                       index = indexOptional.get();
-               } else {
-                       index = extractValue(keyLiteral, Integer.class);
-               }
-               return relBuilder.getRexBuilder().makeFieldAccess(child, index);
-       }
-
-       private RexNode convertIn(List<Expression> children) {
-               Expression headExpr = children.get(1);
-               if (headExpr instanceof TableReferenceExpression) {
-                       QueryOperation tableOperation = 
((TableReferenceExpression) headExpr).getQueryOperation();
-                       RexNode child = children.get(0).accept(this);
-                       return RexSubQuery.in(
-                                       ((FlinkRelBuilder) 
relBuilder).queryOperation(tableOperation).build(),
-                                       ImmutableList.of(child));
-               } else {
-                       List<RexNode> child = convertCallChildren(children);
-                       return relBuilder.call(FlinkSqlOperatorTable.IN, child);
-               }
-       }
-
-       private RexNode convertReinterpretCast(List<Expression> children) {
-               RexNode child = children.get(0).accept(this);
-               TypeLiteralExpression type = (TypeLiteralExpression) 
children.get(1);
-               RexNode checkOverflow = children.get(2).accept(this);
-               return relBuilder.getRexBuilder().makeReinterpretCast(
-                               typeFactory.createFieldTypeFromLogicalType(
-                                               
type.getOutputDataType().getLogicalType().copy(child.getType().isNullable())),
-                               child,
-                               checkOverflow);
-       }
-
-       private RexNode convertCast(List<Expression> children) {
-               RexNode child = children.get(0).accept(this);
-               TypeLiteralExpression type = (TypeLiteralExpression) 
children.get(1);
-               return relBuilder.getRexBuilder().makeAbstractCast(
-                               typeFactory.createFieldTypeFromLogicalType(
-                                               
type.getOutputDataType().getLogicalType().copy(child.getType().isNullable())),
-                               child);
+               throw new RuntimeException("Unknown call expression: " + call);
        }
 
        @Override
@@ -728,140 +160,23 @@ public class ExpressionConverter implements 
ExpressionVisitor<RexNode> {
                }
                Object object = extractValue(valueLiteral, Object.class);
                if (object instanceof TimePointUnit) {
-                       TimeUnit value;
-                       switch ((TimePointUnit) object) {
-                               case YEAR:
-                                       value = TimeUnit.YEAR;
-                                       break;
-                               case MONTH:
-                                       value = TimeUnit.MONTH;
-                                       break;
-                               case DAY:
-                                       value = TimeUnit.DAY;
-                                       break;
-                               case HOUR:
-                                       value = TimeUnit.HOUR;
-                                       break;
-                               case MINUTE:
-                                       value = TimeUnit.MINUTE;
-                                       break;
-                               case SECOND:
-                                       value = TimeUnit.SECOND;
-                                       break;
-                               case QUARTER:
-                                       value = TimeUnit.QUARTER;
-                                       break;
-                               case WEEK:
-                                       value = TimeUnit.WEEK;
-                                       break;
-                               case MILLISECOND:
-                                       value = TimeUnit.MILLISECOND;
-                                       break;
-                               case MICROSECOND:
-                                       value = TimeUnit.MICROSECOND;
-                                       break;
-                               default:
-                                       throw new 
UnsupportedOperationException();
-                       }
+                       TimeUnit value = 
timePointUnitToTimeUnit((TimePointUnit) object);
                        return relBuilder.getRexBuilder().makeFlag(value);
                } else if (object instanceof TimeIntervalUnit) {
-                       TimeUnitRange value;
-                       switch ((TimeIntervalUnit) object) {
-                               case YEAR:
-                                       value = TimeUnitRange.YEAR;
-                                       break;
-                               case YEAR_TO_MONTH:
-                                       value = TimeUnitRange.YEAR_TO_MONTH;
-                                       break;
-                               case QUARTER:
-                                       value = TimeUnitRange.QUARTER;
-                                       break;
-                               case MONTH:
-                                       value = TimeUnitRange.MONTH;
-                                       break;
-                               case WEEK:
-                                       value = TimeUnitRange.WEEK;
-                                       break;
-                               case DAY:
-                                       value = TimeUnitRange.DAY;
-                                       break;
-                               case DAY_TO_HOUR:
-                                       value = TimeUnitRange.DAY_TO_HOUR;
-                                       break;
-                               case DAY_TO_MINUTE:
-                                       value = TimeUnitRange.DAY_TO_MINUTE;
-                                       break;
-                               case DAY_TO_SECOND:
-                                       value = TimeUnitRange.DAY_TO_SECOND;
-                                       break;
-                               case HOUR:
-                                       value = TimeUnitRange.HOUR;
-                                       break;
-                               case SECOND:
-                                       value = TimeUnitRange.SECOND;
-                                       break;
-                               case HOUR_TO_MINUTE:
-                                       value = TimeUnitRange.HOUR_TO_MINUTE;
-                                       break;
-                               case HOUR_TO_SECOND:
-                                       value = TimeUnitRange.HOUR_TO_SECOND;
-                                       break;
-                               case MINUTE:
-                                       value = TimeUnitRange.MINUTE;
-                                       break;
-                               case MINUTE_TO_SECOND:
-                                       value = TimeUnitRange.MINUTE_TO_SECOND;
-                                       break;
-                               default:
-                                       throw new 
UnsupportedOperationException();
-                       }
+                       TimeUnitRange value = 
intervalUnitToUnitRange((TimeIntervalUnit) object);
                        return relBuilder.getRexBuilder().makeFlag(value);
                } else {
                        return relBuilder.literal(extractValue(valueLiteral, 
Object.class));
                }
        }
 
-       /**
-        * Extracts a value from a literal. Including planner-specific 
instances such as {@link Decimal}.
-        */
-       @SuppressWarnings("unchecked")
-       private static <T> T extractValue(ValueLiteralExpression literal, 
Class<T> clazz) {
-               final Optional<Object> possibleObject = 
literal.getValueAs(Object.class);
-               if (!possibleObject.isPresent()) {
-                       throw new TableException("Invalid literal.");
-               }
-               final Object object = possibleObject.get();
-
-               if (clazz.equals(BigDecimal.class)) {
-                       final Optional<BigDecimal> possibleDecimal = 
literal.getValueAs(BigDecimal.class);
-                       if (possibleDecimal.isPresent()) {
-                               return (T) possibleDecimal.get();
-                       }
-                       if (object instanceof Decimal) {
-                               return (T) ((Decimal) object).toBigDecimal();
-                       }
-               }
-
-               return literal.getValueAs(clazz)
-                               .orElseThrow(() -> new 
TableException("Unsupported literal class: " + clazz));
-       }
-
-       /**
-        * Convert a Date value to a Calendar. Calcite's fromCalendarField 
functions use the
-        * Calendar.get methods, so the raw values of the individual fields are 
preserved when
-        * converted to the String formats.
-        *
-        * @return get the Calendar value
-        */
-       private static Calendar valueAsCalendar(Object value) {
-               Date date = (Date) value;
-               Calendar cal = Calendar.getInstance();
-               cal.setTime(date);
-               return cal;
-       }
-
        @Override
        public RexNode visit(FieldReferenceExpression fieldReference) {
+               // We can not use inputCount+inputIndex+FieldIndex to construct 
field of calcite.
+               // See 
QueryOperationConverter.SingleRelVisitor.visit(AggregateQueryOperation).
+               // Calcite will shuffle the output order of groupings.
+               // So the output fields order will be changed too.
+               // See RelBuilder.aggregate, it use ImmutableBitSet to store 
groupings,
                return relBuilder.field(fieldReference.getName());
        }
 
@@ -885,85 +200,129 @@ public class ExpressionConverter implements 
ExpressionVisitor<RexNode> {
                }
        }
 
-       private RexNode createCollation(RexNode node, 
RelFieldCollation.Direction direction,
-                       RelFieldCollation.NullDirection nullDirection, 
Set<SqlKind> kinds) {
-               switch (node.getKind()) {
-                       case DESCENDING:
-                               kinds.add(node.getKind());
-                               return createCollation(((RexCall) 
node).getOperands().get(0), RelFieldCollation.Direction.DESCENDING,
-                                               nullDirection, kinds);
-                       case NULLS_FIRST:
-                               kinds.add(node.getKind());
-                               return createCollation(((RexCall) 
node).getOperands().get(0), direction,
-                                               
RelFieldCollation.NullDirection.FIRST, kinds);
-                       case NULLS_LAST:
-                               kinds.add(node.getKind());
-                               return createCollation(((RexCall) 
node).getOperands().get(0), direction,
-                                               
RelFieldCollation.NullDirection.LAST, kinds);
+       public static List<RexNode> toRexNodes(ConvertContext context, 
List<Expression> expr) {
+               return 
expr.stream().map(context::toRexNode).collect(Collectors.toList());
+       }
+
+       private ConvertContext newFunctionContext() {
+               return new ConvertContext() {
+                       @Override
+                       public RexNode toRexNode(Expression expr) {
+                               return expr.accept(ExpressionConverter.this);
+                       }
+
+                       @Override
+                       public RelBuilder getRelBuilder() {
+                               return relBuilder;
+                       }
+
+                       @Override
+                       public FlinkTypeFactory getTypeFactory() {
+                               return typeFactory;
+                       }
+               };
+       }
+
+       private static TimeUnit timePointUnitToTimeUnit(TimePointUnit unit) {
+               switch (unit) {
+                       case YEAR:
+                               return TimeUnit.YEAR;
+                       case MONTH:
+                               return TimeUnit.MONTH;
+                       case DAY:
+                               return TimeUnit.DAY;
+                       case HOUR:
+                               return TimeUnit.HOUR;
+                       case MINUTE:
+                               return TimeUnit.MINUTE;
+                       case SECOND:
+                               return TimeUnit.SECOND;
+                       case QUARTER:
+                               return TimeUnit.QUARTER;
+                       case WEEK:
+                               return TimeUnit.WEEK;
+                       case MILLISECOND:
+                               return TimeUnit.MILLISECOND;
+                       case MICROSECOND:
+                               return TimeUnit.MICROSECOND;
                        default:
-                               if (nullDirection == null) {
-                                       // Set the null direction if not 
specified.
-                                       // Consistent with 
HIVE/SPARK/MYSQL/BLINK-RUNTIME.
-                                       if 
(FlinkPlannerImpl.defaultNullCollation()
-                                                       
.last(direction.equals(RelFieldCollation.Direction.DESCENDING))) {
-                                               kinds.add(SqlKind.NULLS_LAST);
-                                       } else {
-                                               kinds.add(SqlKind.NULLS_FIRST);
-                                       }
-                               }
-                               return node;
+                               throw new 
UnsupportedOperationException("TimePointUnit is: " + unit);
                }
        }
 
-       private RexWindowBound createBound(Expression bound, SqlKind sqlKind) {
-               if (bound instanceof CallExpression) {
-                       CallExpression callExpr = (CallExpression) bound;
-                       FunctionDefinition func = 
callExpr.getFunctionDefinition();
-                       if 
(BuiltInFunctionDefinitions.UNBOUNDED_ROW.equals(func) || 
BuiltInFunctionDefinitions.UNBOUNDED_RANGE
-                                       .equals(func)) {
-                               SqlNode unbounded = 
sqlKind.equals(SqlKind.PRECEDING) ? SqlWindow
-                                               
.createUnboundedPreceding(SqlParserPos.ZERO) :
-                                               
SqlWindow.createUnboundedFollowing(SqlParserPos.ZERO);
-                               return RexWindowBound.create(unbounded, null);
-                       } else if 
(BuiltInFunctionDefinitions.CURRENT_ROW.equals(func) || 
BuiltInFunctionDefinitions.CURRENT_RANGE
-                                       .equals(func)) {
-                               SqlNode currentRow = 
SqlWindow.createCurrentRow(SqlParserPos.ZERO);
-                               return RexWindowBound.create(currentRow, null);
-                       } else {
-                               throw new IllegalArgumentException("Unexpected 
expression: " + bound);
-                       }
-               } else if (bound instanceof ValueLiteralExpression) {
-                       RelDataType returnType = typeFactory
-                                       .createFieldTypeFromLogicalType(new 
DecimalType(true, 19, 0));
-                       SqlOperator sqlOperator = new SqlPostfixOperator(
-                                       sqlKind.name(),
-                                       sqlKind,
-                                       2,
-                                       new OrdinalReturnTypeInference(0),
-                                       null,
-                                       null);
-                       SqlNode[] operands = new SqlNode[] { 
SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO) };
-                       SqlNode node = new SqlBasicCall(sqlOperator, operands, 
SqlParserPos.ZERO);
+       private static TimeUnitRange intervalUnitToUnitRange(TimeIntervalUnit 
intervalUnit) {
+               switch (intervalUnit) {
+                       case YEAR:
+                               return TimeUnitRange.YEAR;
+                       case YEAR_TO_MONTH:
+                               return TimeUnitRange.YEAR_TO_MONTH;
+                       case QUARTER:
+                               return TimeUnitRange.QUARTER;
+                       case MONTH:
+                               return TimeUnitRange.MONTH;
+                       case WEEK:
+                               return TimeUnitRange.WEEK;
+                       case DAY:
+                               return TimeUnitRange.DAY;
+                       case DAY_TO_HOUR:
+                               return TimeUnitRange.DAY_TO_HOUR;
+                       case DAY_TO_MINUTE:
+                               return TimeUnitRange.DAY_TO_MINUTE;
+                       case DAY_TO_SECOND:
+                               return TimeUnitRange.DAY_TO_SECOND;
+                       case HOUR:
+                               return TimeUnitRange.HOUR;
+                       case SECOND:
+                               return TimeUnitRange.SECOND;
+                       case HOUR_TO_MINUTE:
+                               return TimeUnitRange.HOUR_TO_MINUTE;
+                       case HOUR_TO_SECOND:
+                               return TimeUnitRange.HOUR_TO_SECOND;
+                       case MINUTE:
+                               return TimeUnitRange.MINUTE;
+                       case MINUTE_TO_SECOND:
+                               return TimeUnitRange.MINUTE_TO_SECOND;
+                       default:
+                               throw new 
UnsupportedOperationException("TimeIntervalUnit is: " + intervalUnit);
+               }
+       }
 
-                       ValueLiteralExpression literalExpr = 
(ValueLiteralExpression) bound;
-                       RexNode literalRexNode = 
literalExpr.getValueAs(Double.class).map(
-                                       v -> 
relBuilder.literal(BigDecimal.valueOf((Double) v))).orElse(
-                                       
relBuilder.literal(extractValue(literalExpr, Object.class)));
+       /**
+        * Extracts a value from a literal. Including planner-specific 
instances such as {@link Decimal}.
+        */
+       @SuppressWarnings("unchecked")
+       public static <T> T extractValue(ValueLiteralExpression literal, 
Class<T> clazz) {
+               final Optional<Object> possibleObject = 
literal.getValueAs(Object.class);
+               if (!possibleObject.isPresent()) {
+                       throw new TableException("Invalid literal.");
+               }
+               final Object object = possibleObject.get();
 
-                       List<RexNode> expressions = new ArrayList<>();
-                       expressions.add(literalRexNode);
-                       RexNode rexNode = 
relBuilder.getRexBuilder().makeCall(returnType, sqlOperator, expressions);
-                       return RexWindowBound.create(node, rexNode);
-               } else {
-                       throw new TableException("Unexpected expression: " + 
bound);
+               if (clazz.equals(BigDecimal.class)) {
+                       final Optional<BigDecimal> possibleDecimal = 
literal.getValueAs(BigDecimal.class);
+                       if (possibleDecimal.isPresent()) {
+                               return (T) possibleDecimal.get();
+                       }
+                       if (object instanceof Decimal) {
+                               return (T) ((Decimal) object).toBigDecimal();
+                       }
                }
+
+               return literal.getValueAs(clazz)
+                       .orElseThrow(() -> new TableException("Unsupported 
literal class: " + clazz));
        }
 
        /**
-        * RexNodeConversion to define how to convert a {@link CallExpression} 
which
-        * has built-in FunctionDefinition to RexNode.
+        * Convert a Date value to a Calendar. Calcite's fromCalendarField 
functions use the
+        * Calendar.get methods, so the raw values of the individual fields are 
preserved when
+        * converted to the String formats.
+        *
+        * @return get the Calendar value
         */
-       private interface RexNodeConversion {
-               RexNode convert(List<Expression> children);
+       private static Calendar valueAsCalendar(Object value) {
+               Date date = (Date) value;
+               Calendar cal = Calendar.getInstance();
+               cal.setTime(date);
+               return cal;
        }
 }
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java
new file mode 100644
index 0000000..7d8173c
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/OverConvertRule.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.expressions.Expression;
+import org.apache.flink.table.expressions.ResolvedExpression;
+import org.apache.flink.table.expressions.ValueLiteralExpression;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
+import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.expressions.SqlAggFunctionVisitor;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.calcite.rel.RelFieldCollation;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rex.RexCall;
+import org.apache.calcite.rex.RexFieldCollation;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexWindowBound;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlBasicCall;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlLiteral;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlPostfixOperator;
+import org.apache.calcite.sql.SqlWindow;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.sql.type.OrdinalReturnTypeInference;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.extractValue;
+import static 
org.apache.flink.table.runtime.types.LogicalTypeDataTypeConverter.fromDataTypeToLogicalType;
+
+/**
+ * A {@link CallExpressionConvertRule} that converts {@link 
BuiltInFunctionDefinitions#OVER}.
+ */
+public class OverConvertRule implements CallExpressionConvertRule {
+
+       @Override
+       public Optional<RexNode> convert(CallExpression call, ConvertContext 
context) {
+               List<Expression> children = call.getChildren();
+               if (call.getFunctionDefinition() == 
BuiltInFunctionDefinitions.OVER) {
+                       FlinkTypeFactory typeFactory = context.getTypeFactory();
+                       Expression agg = children.get(0);
+                       SqlAggFunction aggFunc = agg.accept(new 
SqlAggFunctionVisitor(typeFactory));
+                       RelDataType aggResultType = 
typeFactory.createFieldTypeFromLogicalType(
+                               fromDataTypeToLogicalType(((ResolvedExpression) 
agg).getOutputDataType()));
+
+                       // assemble exprs by agg children
+                       List<RexNode> aggExprs = 
agg.getChildren().stream().map(context::toRexNode)
+                               .collect(Collectors.toList());
+
+                       // assemble order by key
+                       Expression orderKeyExpr = children.get(1);
+                       Set<SqlKind> kinds = new HashSet<>();
+                       RexNode collationRexNode = createCollation(
+                               context.toRexNode(orderKeyExpr), 
RelFieldCollation.Direction.ASCENDING, null, kinds);
+                       ImmutableList<RexFieldCollation> orderKey = 
ImmutableList
+                               .of(new RexFieldCollation(collationRexNode, 
kinds));
+
+                       // assemble partition by keys
+                       List<RexNode> partitionKeys = children.subList(4, 
children.size()).stream().map(context::toRexNode)
+                               .collect(Collectors.toList());
+                       // assemble bounds
+                       Expression preceding = children.get(2);
+                       boolean isPhysical = LogicalTypeChecks.hasRoot(
+                               fromDataTypeToLogicalType(((ResolvedExpression) 
preceding).getOutputDataType()),
+                               LogicalTypeRoot.BIGINT);
+                       Expression following = children.get(3);
+                       RexWindowBound lowerBound = createBound(context, 
preceding, SqlKind.PRECEDING);
+                       RexWindowBound upperBound = createBound(context, 
following, SqlKind.FOLLOWING);
+
+                       // build RexOver
+                       return 
Optional.of(context.getRelBuilder().getRexBuilder().makeOver(
+                               aggResultType,
+                               aggFunc,
+                               aggExprs,
+                               partitionKeys,
+                               orderKey,
+                               lowerBound,
+                               upperBound,
+                               isPhysical,
+                               true,
+                               false,
+                               false));
+               }
+               return Optional.empty();
+       }
+
+       private RexNode createCollation(
+                       RexNode node,
+                       RelFieldCollation.Direction direction,
+                       RelFieldCollation.NullDirection nullDirection,
+                       Set<SqlKind> kinds) {
+               switch (node.getKind()) {
+                       case DESCENDING:
+                               kinds.add(node.getKind());
+                               return createCollation(((RexCall) 
node).getOperands().get(0), RelFieldCollation.Direction.DESCENDING,
+                                       nullDirection, kinds);
+                       case NULLS_FIRST:
+                               kinds.add(node.getKind());
+                               return createCollation(((RexCall) 
node).getOperands().get(0), direction,
+                                       RelFieldCollation.NullDirection.FIRST, 
kinds);
+                       case NULLS_LAST:
+                               kinds.add(node.getKind());
+                               return createCollation(((RexCall) 
node).getOperands().get(0), direction,
+                                       RelFieldCollation.NullDirection.LAST, 
kinds);
+                       default:
+                               if (nullDirection == null) {
+                                       // Set the null direction if not 
specified.
+                                       // Consistent with 
HIVE/SPARK/MYSQL/BLINK-RUNTIME.
+                                       if 
(FlinkPlannerImpl.defaultNullCollation()
+                                               
.last(direction.equals(RelFieldCollation.Direction.DESCENDING))) {
+                                               kinds.add(SqlKind.NULLS_LAST);
+                                       } else {
+                                               kinds.add(SqlKind.NULLS_FIRST);
+                                       }
+                               }
+                               return node;
+               }
+       }
+
+       private RexWindowBound createBound(ConvertContext context, Expression 
bound, SqlKind sqlKind) {
+               if (bound instanceof CallExpression) {
+                       CallExpression callExpr = (CallExpression) bound;
+                       FunctionDefinition func = 
callExpr.getFunctionDefinition();
+                       if 
(BuiltInFunctionDefinitions.UNBOUNDED_ROW.equals(func) || 
BuiltInFunctionDefinitions.UNBOUNDED_RANGE
+                               .equals(func)) {
+                               SqlNode unbounded = 
sqlKind.equals(SqlKind.PRECEDING) ? SqlWindow
+                                       
.createUnboundedPreceding(SqlParserPos.ZERO) :
+                                       
SqlWindow.createUnboundedFollowing(SqlParserPos.ZERO);
+                               return RexWindowBound.create(unbounded, null);
+                       } else if 
(BuiltInFunctionDefinitions.CURRENT_ROW.equals(func) || 
BuiltInFunctionDefinitions.CURRENT_RANGE
+                               .equals(func)) {
+                               SqlNode currentRow = 
SqlWindow.createCurrentRow(SqlParserPos.ZERO);
+                               return RexWindowBound.create(currentRow, null);
+                       } else {
+                               throw new IllegalArgumentException("Unexpected 
expression: " + bound);
+                       }
+               } else if (bound instanceof ValueLiteralExpression) {
+                       RelDataType returnType = context.getTypeFactory()
+                               .createFieldTypeFromLogicalType(new 
DecimalType(true, 19, 0));
+                       SqlOperator sqlOperator = new SqlPostfixOperator(
+                               sqlKind.name(),
+                               sqlKind,
+                               2,
+                               new OrdinalReturnTypeInference(0),
+                               null,
+                               null);
+                       SqlNode[] operands = new SqlNode[] { 
SqlLiteral.createExactNumeric("1", SqlParserPos.ZERO) };
+                       SqlNode node = new SqlBasicCall(sqlOperator, operands, 
SqlParserPos.ZERO);
+
+                       ValueLiteralExpression literalExpr = 
(ValueLiteralExpression) bound;
+                       RexNode literalRexNode = 
literalExpr.getValueAs(Double.class).map(
+                               v -> 
context.getRelBuilder().literal(BigDecimal.valueOf((Double) v))).orElse(
+                               
context.getRelBuilder().literal(extractValue(literalExpr, Object.class)));
+
+                       List<RexNode> expressions = new ArrayList<>();
+                       expressions.add(literalRexNode);
+                       RexNode rexNode = 
context.getRelBuilder().getRexBuilder().makeCall(
+                               returnType, sqlOperator, expressions);
+                       return RexWindowBound.create(node, rexNode);
+               } else {
+                       throw new TableException("Unexpected expression: " + 
bound);
+               }
+       }
+}
diff --git 
a/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ScalarFunctionConvertRule.java
 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ScalarFunctionConvertRule.java
new file mode 100644
index 0000000..8921092
--- /dev/null
+++ 
b/flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/planner/expressions/converter/ScalarFunctionConvertRule.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.expressions.converter;
+
+import org.apache.flink.table.expressions.CallExpression;
+import org.apache.flink.table.functions.FunctionDefinition;
+import org.apache.flink.table.functions.ScalarFunction;
+import org.apache.flink.table.functions.ScalarFunctionDefinition;
+import org.apache.flink.table.planner.functions.utils.UserDefinedFunctionUtils;
+
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlFunction;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter.toRexNodes;
+
+/**
+ * {@link CallExpressionConvertRule} to convert {@link 
ScalarFunctionDefinition}.
+ */
+public class ScalarFunctionConvertRule implements CallExpressionConvertRule {
+
+       @Override
+       public Optional<RexNode> convert(CallExpression call, ConvertContext 
context) {
+               FunctionDefinition def = call.getFunctionDefinition();
+               if (def instanceof ScalarFunctionDefinition) {
+                       ScalarFunction scalaFunc = ((ScalarFunctionDefinition) 
def).getScalarFunction();
+                       SqlFunction sqlFunction = 
UserDefinedFunctionUtils.createScalarSqlFunction(
+                               scalaFunc.functionIdentifier(),
+                               scalaFunc.toString(),
+                               scalaFunc,
+                               context.getTypeFactory());
+                       return Optional.of(context.getRelBuilder()
+                               .call(sqlFunction, toRexNodes(context, 
call.getChildren())));
+               }
+               return Optional.empty();
+       }
+}
diff --git a/tools/maven/suppressions.xml b/tools/maven/suppressions.xml
index 7635914..4a56880 100644
--- a/tools/maven/suppressions.xml
+++ b/tools/maven/suppressions.xml
@@ -35,9 +35,9 @@ under the License.
                <suppress
                        
files="org[\\/]apache[\\/]flink[\\/]streaming[\\/]python[\\/]connectors[\\/].*.java"
                checks="MethodNameCheck|ParameterName"/>
-           <!-- RexNodeConverter has to use guava directly -->
+           <!-- Have to use guava directly -->
            <suppress
-                       files="RexNodeConverter.java"
+                       files="OverConvertRule.java|CustomizedConvertRule.java"
                        checks="IllegalImport"/>
                <!-- Cassandra connectors have to use guava directly -->
                <suppress

Reply via email to