beyond1920 commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300521116
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##########
 @@ -107,94 +341,295 @@ public RexNode visit(CallExpression call) {
                                .collect(Collectors.toList());
        }
 
-       private RexNode translateScalarCall(FunctionDefinition def, 
List<Expression> children) {
+       private RexNode convert(SqlOperator sqlOperator, List<Expression> 
children) {
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               return relBuilder.call(sqlOperator, childrenRexNode);
+       }
 
-               if (def.equals(BuiltInFunctionDefinitions.CAST)) {
-                       RexNode child = children.get(0).accept(this);
-                       TypeLiteralExpression type = (TypeLiteralExpression) 
children.get(1);
-                       return relBuilder.getRexBuilder().makeAbstractCast(
-                                       
typeFactory.createFieldTypeFromLogicalType(
-                                                       
type.getOutputDataType().getLogicalType().copy(child.getType().isNullable())),
-                                       child);
-               } else if 
(def.equals(BuiltInFunctionDefinitions.REINTERPRET_CAST)) {
-                       RexNode child = children.get(0).accept(this);
-                       TypeLiteralExpression type = (TypeLiteralExpression) 
children.get(1);
-                       RexNode checkOverflow = children.get(2).accept(this);
-                       return relBuilder.getRexBuilder().makeReinterpretCast(
-                                       
typeFactory.createFieldTypeFromLogicalType(
-                                                       
type.getOutputDataType().getLogicalType().copy(child.getType().isNullable())),
-                                       child,
-                                       checkOverflow);
+       private RexNode convertArrayElement(List<Expression> children) {
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               return relBuilder.call(FlinkSqlOperatorTable.ELEMENT, 
childrenRexNode);
+       }
+
+       private RexNode convertOrderAsc(List<Expression> children) {
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               return childrenRexNode.get(0);
+       }
+
+       private RexNode convertTimestampDiff(List<Expression> children) {
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               return relBuilder.call(FlinkSqlOperatorTable.TIMESTAMP_DIFF, 
childrenRexNode.get(0), childrenRexNode.get(2),
+                               childrenRexNode.get(1));
+       }
+
+       private RexNode convertIsNull(List<Expression> children) {
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               return relBuilder.isNull(childrenRexNode.get(0));
+       }
+
+       private RexNode convertNotBetween(List<Expression> children) {
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               return relBuilder.or(
+                               
relBuilder.call(FlinkSqlOperatorTable.LESS_THAN, childrenRexNode),
+                               
relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN, childrenRexNode));
+       }
+
+       private RexNode convertBetween(List<Expression> children) {
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               return relBuilder.and(
+                               
relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, childrenRexNode),
+                               
relBuilder.call(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, childrenRexNode));
+       }
+
+       private RexNode convertCeil(List<Expression> children) {
+               Preconditions.checkArgument(children.size() == 1 || 
children.size() == 2);
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               if (children.size() == 1) {
+                       return relBuilder.call(FlinkSqlOperatorTable.CEIL, 
childrenRexNode);
+               } else {
+                       return relBuilder.call(FlinkSqlOperatorTable.CEIL, 
childrenRexNode.get(1), childrenRexNode.get(0));
                }
+       }
 
-               List<RexNode> child = convertCallChildren(children);
-               if (BuiltInFunctionDefinitions.IF.equals(def)) {
-                       return relBuilder.call(FlinkSqlOperatorTable.CASE, 
child);
-               } else if (BuiltInFunctionDefinitions.IS_NULL.equals(def)) {
-                       return relBuilder.isNull(child.get(0));
-               } else if (BuiltInFunctionDefinitions.PLUS.equals(def)) {
-                       if 
(isCharacterString(toLogicalType(child.get(0).getType()))) {
-                               return relBuilder.call(
-                                               FlinkSqlOperatorTable.CONCAT,
-                                               child.get(0),
-                                               relBuilder.cast(child.get(1), 
VARCHAR));
-                       } else if 
(isCharacterString(toLogicalType(child.get(1).getType()))) {
-                               return relBuilder.call(
-                                               FlinkSqlOperatorTable.CONCAT,
-                                               relBuilder.cast(child.get(0), 
VARCHAR),
-                                               child.get(1));
-                       } else if 
(isTimeInterval(toLogicalType(child.get(0).getType())) &&
-                                       child.get(0).getType() == 
child.get(1).getType()) {
-                               return 
relBuilder.call(FlinkSqlOperatorTable.PLUS, child);
-                       } else if 
(isTimeInterval(toLogicalType(child.get(0).getType()))
-                                       && 
isTemporal(toLogicalType(child.get(1).getType()))) {
-                               // Calcite has a bug that can't apply INTERVAL 
+ DATETIME (INTERVAL at left)
-                               // we manually switch them here
-                               return 
relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, child);
-                       } else if 
(isTemporal(toLogicalType(child.get(0).getType())) &&
-                                       
isTemporal(toLogicalType(child.get(1).getType()))) {
-                               return 
relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, child);
-                       } else {
-                               return 
relBuilder.call(FlinkSqlOperatorTable.PLUS, child);
-                       }
-               } else if (BuiltInFunctionDefinitions.MINUS.equals(def)) {
-                       return relBuilder.call(FlinkSqlOperatorTable.MINUS, 
child);
-               } else if (BuiltInFunctionDefinitions.EQUALS.equals(def)) {
-                       return relBuilder.call(FlinkSqlOperatorTable.EQUALS, 
child);
-               } else if (BuiltInFunctionDefinitions.DIVIDE.equals(def)) {
-                       return relBuilder.call(FlinkSqlOperatorTable.DIVIDE, 
child);
-               } else if (BuiltInFunctionDefinitions.LESS_THAN.equals(def)) {
-                       return relBuilder.call(FlinkSqlOperatorTable.LESS_THAN, 
child);
-               } else if (BuiltInFunctionDefinitions.GREATER_THAN.equals(def)) 
{
-                       return 
relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN, child);
-               } else if (BuiltInFunctionDefinitions.OR.equals(def)) {
-                       return relBuilder.call(FlinkSqlOperatorTable.OR, child);
-               } else if (BuiltInFunctionDefinitions.CONCAT.equals(def)) {
-                       return relBuilder.call(FlinkSqlOperatorTable.CONCAT, 
child);
-               } else if 
(InternalFunctionDefinitions.THROW_EXCEPTION.equals(def)) {
-                       return 
relBuilder.call(FlinkSqlOperatorTable.THROW_EXCEPTION, child);
-               } else if (BuiltInFunctionDefinitions.AND.equals(def)) {
-                       return relBuilder.call(FlinkSqlOperatorTable.AND, 
child);
-               } else if (BuiltInFunctionDefinitions.NOT.equals(def)) {
-                       return relBuilder.call(FlinkSqlOperatorTable.NOT, 
child);
-               } else if (BuiltInFunctionDefinitions.TIMES.equals(def)) {
-                       return relBuilder.call(FlinkSqlOperatorTable.MULTIPLY, 
child);
-               } else if (BuiltInFunctionDefinitions.MOD.equals(def)) {
-                       return relBuilder.call(FlinkSqlOperatorTable.MOD, 
child);
-               } else if (def instanceof ScalarFunctionDefinition) {
-                       ScalarFunction scalarFunc = ((ScalarFunctionDefinition) 
def).getScalarFunction();
-                       SqlFunction sqlFunction = 
UserDefinedFunctionUtils.createScalarSqlFunction(
-                               // TODO use the name under which the function 
is registered
-                               scalarFunc.functionIdentifier(),
-                               scalarFunc.toString(),
-                               scalarFunc,
-                               typeFactory);
-                       return relBuilder.call(sqlFunction, child);
+       private RexNode convertFloor(List<Expression> children) {
+               Preconditions.checkArgument(children.size() == 1 || 
children.size() == 2);
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               if (children.size() == 1) {
+                       return relBuilder.call(FlinkSqlOperatorTable.FLOOR, 
childrenRexNode);
+               } else {
+                       return relBuilder.call(FlinkSqlOperatorTable.FLOOR, 
childrenRexNode.get(1), childrenRexNode.get(0));
+               }
+       }
+
+       private RexNode convertArray(List<Expression> children) {
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               ArrayType arrayType = new 
ArrayType(toLogicalType(childrenRexNode.get(0).getType()));
+               // TODO get type from CallExpression directly
+               RelDataType relDataType = 
typeFactory.createFieldTypeFromLogicalType(arrayType);
+               return relBuilder.getRexBuilder().makeCall(relDataType, 
FlinkSqlOperatorTable.ARRAY_VALUE_CONSTRUCTOR, childrenRexNode);
+       }
+
+       private RexNode convertMap(List<Expression> children) {
+               Preconditions.checkArgument(!children.isEmpty() && 
children.size() % 2 == 0);
+               // TODO get type from CallExpression directly
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               RelDataType keyType = childrenRexNode.get(0).getType();
+               RelDataType valueType = 
childrenRexNode.get(childrenRexNode.size() - 1).getType();
+               RelDataType mapType = typeFactory.createMapType(keyType, 
valueType);
+               return relBuilder.getRexBuilder().makeCall(mapType, 
FlinkSqlOperatorTable.MAP_VALUE_CONSTRUCTOR, childrenRexNode);
+       }
+
+       private RexNode convertRow(List<Expression> children) {
+               // TODO get type from CallExpression directly
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               LogicalType[] childTypes = childrenRexNode.stream().map(rexNode 
-> toLogicalType(rexNode.getType()))
+                               .toArray(LogicalType[]::new);
+               RowType rowType = RowType.of(childTypes);
+               RelDataType relDataType = 
typeFactory.createFieldTypeFromLogicalType(rowType);
+               return relBuilder.getRexBuilder().makeCall(relDataType, 
FlinkSqlOperatorTable.ROW, childrenRexNode);
+       }
+
+       private RexNode convertTemporalOverlaps(List<Expression> children) {
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               // Standard conversion of the OVERLAPS operator.
+               // Source: 
[[org.apache.calcite.sql2rel.StandardConvertletTable#convertOverlaps()]]
+               RexNode leftTimePoint = childrenRexNode.get(0);
+               RexNode leftTemporal = childrenRexNode.get(1);
+               RexNode rightTimePoint = childrenRexNode.get(2);
+               RexNode rightTemporal = childrenRexNode.get(3);
+               RexNode convLeftT;
+               if (isTimeInterval(toLogicalType(leftTemporal.getType()))) {
+                       convLeftT = 
relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, leftTimePoint, 
leftTemporal);
+               } else {
+                       convLeftT = leftTemporal;
+               }
+               // sort end points into start and end, such that (s0 <= e0) and 
(s1 <= e1).
+               RexNode leftLe = 
relBuilder.call(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, leftTimePoint, 
convLeftT);
+               RexNode s0 = relBuilder.call(FlinkSqlOperatorTable.CASE, 
leftLe, leftTimePoint, convLeftT);
+               RexNode e0 = relBuilder.call(FlinkSqlOperatorTable.CASE, 
leftLe, convLeftT, leftTimePoint);
+               RexNode convRightT;
+               if (isTimeInterval(toLogicalType(rightTemporal.getType()))) {
+                       convRightT = 
relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, rightTimePoint, 
rightTemporal);
+               } else {
+                       convRightT = rightTemporal;
+               }
+               RexNode rightLe = 
relBuilder.call(FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL, rightTimePoint, 
convRightT);
+               RexNode s1 = relBuilder.call(FlinkSqlOperatorTable.CASE, 
rightLe, rightTimePoint, convRightT);
+               RexNode e1 = relBuilder.call(FlinkSqlOperatorTable.CASE, 
rightLe, convRightT, rightTimePoint);
+
+               // (e0 >= s1) AND (e1 >= s0)
+               RexNode leftPred = 
relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, e0, s1);
+               RexNode rightPred = 
relBuilder.call(FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL, e1, s0);
+               return relBuilder.call(FlinkSqlOperatorTable.AND, leftPred, 
rightPred);
+       }
+
+       private RexNode convertPlus(List<Expression> children) {
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               if 
(isCharacterString(toLogicalType(childrenRexNode.get(0).getType()))) {
+                       return relBuilder.call(
+                                       FlinkSqlOperatorTable.CONCAT,
+                                       childrenRexNode.get(0),
+                                       relBuilder.cast(childrenRexNode.get(1), 
VARCHAR));
+               } else if 
(isCharacterString(toLogicalType(childrenRexNode.get(1).getType()))) {
+                       return relBuilder.call(
+                                       FlinkSqlOperatorTable.CONCAT,
+                                       relBuilder.cast(childrenRexNode.get(0), 
VARCHAR),
+                                       childrenRexNode.get(1));
+               } else if 
(isTimeInterval(toLogicalType(childrenRexNode.get(0).getType())) &&
+                               childrenRexNode.get(0).getType() == 
childrenRexNode.get(1).getType()) {
+                       return relBuilder.call(FlinkSqlOperatorTable.PLUS, 
childrenRexNode);
+               } else if 
(isTimeInterval(toLogicalType(childrenRexNode.get(0).getType()))
+                               && 
isTemporal(toLogicalType(childrenRexNode.get(1).getType()))) {
+                       // Calcite has a bug that can't apply INTERVAL + 
DATETIME (INTERVAL at left)
+                       // we manually switch them here
+                       return 
relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, childrenRexNode.get(1), 
childrenRexNode.get(0));
+               } else if 
(isTemporal(toLogicalType(childrenRexNode.get(0).getType())) &&
+                               
isTemporal(toLogicalType(childrenRexNode.get(1).getType()))) {
+                       return 
relBuilder.call(FlinkSqlOperatorTable.DATETIME_PLUS, childrenRexNode);
+               } else {
+                       return relBuilder.call(FlinkSqlOperatorTable.PLUS, 
childrenRexNode);
+               }
+       }
+
+       private RexNode convertReplace(List<Expression> children) {
+               Preconditions.checkArgument(children.size() == 2 || 
children.size() == 3);
+               List<RexNode> childrenRexNode = convertCallChildren(children);
+               if (children.size() == 2) {
+                       return relBuilder.call(
+                                       FlinkSqlOperatorTable.REPLACE,
+                                       childrenRexNode.get(0),
+                                       childrenRexNode.get(1),
+                                       
relBuilder.call(FlinkSqlOperatorTable.CHAR_LENGTH, childrenRexNode.get(0)));
                } else {
-                       throw new UnsupportedOperationException(def.toString());
+                       return relBuilder.call(FlinkSqlOperatorTable.REPLACE, 
childrenRexNode);
                }
        }
 
+       private RexNode convertOver(List<Expression> children) {
+               List<Expression> args = children;
+               Expression agg = args.get(0);
+               SqlAggFunction aggFunc = agg.accept(new 
SqlAggFunctionVisitor(typeFactory));
+               RelDataType aggResultType = 
typeFactory.createFieldTypeFromLogicalType(
+                               fromDataTypeToLogicalType(((ResolvedExpression) 
agg).getOutputDataType()));
+
+               // assemble exprs by agg children
+               List<RexNode> aggExprs = agg.getChildren().stream().map(expr -> 
expr.accept(this))
+                               .collect(Collectors.toList());
+
+               // assemble order by key
+               Expression orderKeyExpr = args.get(1);
+               Set<SqlKind> kinds = new HashSet<>();
+               RexNode collationRexNode = 
createCollation(orderKeyExpr.accept(this), 
RelFieldCollation.Direction.ASCENDING,
+                               null, kinds);
+               ImmutableList<RexFieldCollation> orderKey = ImmutableList
+                               .of(new RexFieldCollation(collationRexNode, 
kinds));
+
+               // assemble partition by keys
+               List<RexNode> partitionKeys = args.subList(4, 
args.size()).stream().map(expr -> expr.accept(this))
+                               .collect(Collectors.toList());
+               // assemble bounds
+               Expression preceding = args.get(2);
+               boolean isPhysical = ((ResolvedExpression) 
preceding).getOutputDataType().equals(DataTypes.BIGINT());
+               Expression following = args.get(3);
+               RexWindowBound lowerBound = createBound(preceding, 
SqlKind.PRECEDING);
+               RexWindowBound upperBound = createBound(following, 
SqlKind.FOLLOWING);
+
+               // build RexOver
+               return relBuilder.getRexBuilder().makeOver(
+                               aggResultType,
+                               aggFunc,
+                               aggExprs,
+                               partitionKeys,
+                               orderKey,
+                               lowerBound,
+                               upperBound,
+                               isPhysical,
+                               true,
+                               false,
+                               false);
+       }
+
+       private RexNode convertAs(List<Expression> children) {
+               String name = extractValue((ValueLiteralExpression) 
children.get(1), String.class);
+               RexNode child = children.get(0).accept(this);
+               return relBuilder.alias(child, name);
+       }
+
+       private RexNode convertTrim(List<Expression> children) {
+               ValueLiteralExpression removeLeadingExpr = 
(ValueLiteralExpression) children.get(0);
+               Boolean removeLeading = extractValue(removeLeadingExpr, 
Boolean.class);
+               ValueLiteralExpression removeTrailingExpr = 
(ValueLiteralExpression) children.get(1);
+               Boolean removeTrailing = extractValue(removeTrailingExpr, 
Boolean.class);
+               RexNode trimString = children.get(2).accept(this);
+               RexNode str = children.get(3).accept(this);
+               Enum trimMode;
+               if (removeLeading && removeTrailing) {
+                       trimMode = SqlTrimFunction.Flag.BOTH;
+               } else if (removeLeading) {
+                       trimMode = SqlTrimFunction.Flag.LEADING;
+               } else if (removeTrailing) {
+                       trimMode = SqlTrimFunction.Flag.TRAILING;
+               } else {
+                       throw new IllegalArgumentException("Unsupported trim 
mode.");
+               }
+               return relBuilder.call(
+                               FlinkSqlOperatorTable.TRIM,
+                               relBuilder.getRexBuilder().makeFlag(trimMode),
+                               trimString,
+                               str);
+       }
+
+       private RexNode ConvertGet(List<Expression> children) {
 
 Review comment:
   sorry for that, @JingsongLi fixed the compile error in #8689

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to