JingsongLi commented on a change in pull request #8977: 
[FLINK-13071][table-planner-blink] QueryOperationConverter in Blink planner 
support add kinds of QueryOperations.
URL: https://github.com/apache/flink/pull/8977#discussion_r300213763
 
 

 ##########
 File path: 
flink-table/flink-table-planner-blink/src/main/java/org/apache/flink/table/expressions/RexNodeConverter.java
 ##########
 @@ -79,25 +112,196 @@
        private final RelBuilder relBuilder;
        private final FlinkTypeFactory typeFactory;
 
+       private static final int DECIMAL_PRECISION_NEEDED_FOR_LONG = 19;
+
+       /**
+        * The mapping only keeps part of FunctionDefinitions, which could be 
converted to SqlOperator in a very simple
+        * way.
+        */
+       private static final Map<FunctionDefinition, SqlOperator> 
SIMPLE_DEF_SQL_OPERATOR_MAPPING = new HashMap<>();
+
+       static {
+               // logic functions
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AND, 
FlinkSqlOperatorTable.AND);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OR, 
FlinkSqlOperatorTable.OR);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT, 
FlinkSqlOperatorTable.NOT);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IF, 
FlinkSqlOperatorTable.CASE);
+
+               // comparison functions
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EQUALS, 
FlinkSqlOperatorTable.EQUALS);
+               SIMPLE_DEF_SQL_OPERATOR_MAPPING
+                               .put(BuiltInFunctionDefinitions.GREATER_THAN, 
FlinkSqlOperatorTable.GREATER_THAN);
+               SIMPLE_DEF_SQL_OPERATOR_MAPPING
+                               
.put(BuiltInFunctionDefinitions.GREATER_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.GREATER_THAN_OR_EQUAL);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LESS_THAN, 
FlinkSqlOperatorTable.LESS_THAN);
+               SIMPLE_DEF_SQL_OPERATOR_MAPPING
+                               
.put(BuiltInFunctionDefinitions.LESS_THAN_OR_EQUAL, 
FlinkSqlOperatorTable.LESS_THAN_OR_EQUAL);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.NOT_EQUALS, 
FlinkSqlOperatorTable.NOT_EQUALS);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NULL, 
FlinkSqlOperatorTable.IS_NULL);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_NULL, 
FlinkSqlOperatorTable.IS_NOT_NULL);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_TRUE, 
FlinkSqlOperatorTable.IS_TRUE);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_FALSE, 
FlinkSqlOperatorTable.IS_FALSE);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.IS_NOT_TRUE, 
FlinkSqlOperatorTable.IS_NOT_TRUE);
+               SIMPLE_DEF_SQL_OPERATOR_MAPPING
+                               .put(BuiltInFunctionDefinitions.IS_NOT_FALSE, 
FlinkSqlOperatorTable.IS_NOT_FALSE);
+
+               // string functions
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CHAR_LENGTH, 
FlinkSqlOperatorTable.CHAR_LENGTH);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.INIT_CAP, 
FlinkSqlOperatorTable.INITCAP);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LIKE, 
FlinkSqlOperatorTable.LIKE);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOWER, 
FlinkSqlOperatorTable.LOWER);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIMILAR, 
FlinkSqlOperatorTable.SIMILAR_TO);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SUBSTRING, 
FlinkSqlOperatorTable.SUBSTRING);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UPPER, 
FlinkSqlOperatorTable.UPPER);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POSITION, 
FlinkSqlOperatorTable.POSITION);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.OVERLAY, 
FlinkSqlOperatorTable.OVERLAY);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT, 
FlinkSqlOperatorTable.CONCAT_FUNCTION);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CONCAT_WS, 
FlinkSqlOperatorTable.CONCAT_WS);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LPAD, 
FlinkSqlOperatorTable.LPAD);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RPAD, 
FlinkSqlOperatorTable.RPAD);
+               SIMPLE_DEF_SQL_OPERATOR_MAPPING
+                               .put(BuiltInFunctionDefinitions.REGEXP_EXTRACT, 
FlinkSqlOperatorTable.REGEXP_EXTRACT);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.FROM_BASE64, 
FlinkSqlOperatorTable.FROM_BASE64);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TO_BASE64, 
FlinkSqlOperatorTable.TO_BASE64);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.UUID, 
FlinkSqlOperatorTable.UUID);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LTRIM, 
FlinkSqlOperatorTable.LTRIM);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RTRIM, 
FlinkSqlOperatorTable.RTRIM);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.REPEAT, 
FlinkSqlOperatorTable.REPEAT);
+               SIMPLE_DEF_SQL_OPERATOR_MAPPING
+                               .put(BuiltInFunctionDefinitions.REGEXP_REPLACE, 
FlinkSqlOperatorTable.REGEXP_REPLACE);
+
+               // math functions
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MINUS, 
FlinkSqlOperatorTable.MINUS);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.DIVIDE, 
FlinkSqlOperatorTable.DIVIDE);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TIMES, 
FlinkSqlOperatorTable.MULTIPLY);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ABS, 
FlinkSqlOperatorTable.ABS);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EXP, 
FlinkSqlOperatorTable.EXP);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOG10, 
FlinkSqlOperatorTable.LOG10);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOG2, 
FlinkSqlOperatorTable.LOG2);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LN, 
FlinkSqlOperatorTable.LN);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOG, 
FlinkSqlOperatorTable.LOG);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.POWER, 
FlinkSqlOperatorTable.POWER);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MOD, 
FlinkSqlOperatorTable.MOD);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SQRT, 
FlinkSqlOperatorTable.SQRT);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MINUS_PREFIX, 
FlinkSqlOperatorTable.UNARY_MINUS);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIN, 
FlinkSqlOperatorTable.SIN);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COS, 
FlinkSqlOperatorTable.COS);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SINH, 
FlinkSqlOperatorTable.SINH);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TAN, 
FlinkSqlOperatorTable.TAN);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TANH, 
FlinkSqlOperatorTable.TANH);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COT, 
FlinkSqlOperatorTable.COT);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ASIN, 
FlinkSqlOperatorTable.ASIN);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ACOS, 
FlinkSqlOperatorTable.ACOS);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ATAN, 
FlinkSqlOperatorTable.ATAN);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ATAN2, 
FlinkSqlOperatorTable.ATAN2);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.COSH, 
FlinkSqlOperatorTable.COSH);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.DEGREES, 
FlinkSqlOperatorTable.DEGREES);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RADIANS, 
FlinkSqlOperatorTable.RADIANS);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SIGN, 
FlinkSqlOperatorTable.SIGN);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ROUND, 
FlinkSqlOperatorTable.ROUND);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.PI, 
FlinkSqlOperatorTable.PI);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.E, 
FlinkSqlOperatorTable.E);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.RAND, 
FlinkSqlOperatorTable.RAND);
+               SIMPLE_DEF_SQL_OPERATOR_MAPPING
+                               .put(BuiltInFunctionDefinitions.RAND_INTEGER, 
FlinkSqlOperatorTable.RAND_INTEGER);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.BIN, 
FlinkSqlOperatorTable.BIN);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.HEX, 
FlinkSqlOperatorTable.HEX);
+
+//             TODO
+//             
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.TRUNCATE, 
FlinkSqlOperatorTable.TRUNCATE);
+
+               // time functions
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.EXTRACT, 
FlinkSqlOperatorTable.EXTRACT);
+               SIMPLE_DEF_SQL_OPERATOR_MAPPING
+                               .put(BuiltInFunctionDefinitions.CURRENT_DATE, 
FlinkSqlOperatorTable.CURRENT_DATE);
+               SIMPLE_DEF_SQL_OPERATOR_MAPPING
+                               .put(BuiltInFunctionDefinitions.CURRENT_TIME, 
FlinkSqlOperatorTable.CURRENT_TIME);
+               SIMPLE_DEF_SQL_OPERATOR_MAPPING
+                               
.put(BuiltInFunctionDefinitions.CURRENT_TIMESTAMP, 
FlinkSqlOperatorTable.CURRENT_TIMESTAMP);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.LOCAL_TIME, 
FlinkSqlOperatorTable.LOCALTIME);
+               SIMPLE_DEF_SQL_OPERATOR_MAPPING
+                               
.put(BuiltInFunctionDefinitions.LOCAL_TIMESTAMP, 
FlinkSqlOperatorTable.LOCALTIMESTAMP);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.DATE_FORMAT, 
FlinkSqlOperatorTable.DATE_FORMAT);
+
+               // collection
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.AT, 
FlinkSqlOperatorTable.ITEM);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.CARDINALITY, 
FlinkSqlOperatorTable.CARDINALITY);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.ORDER_DESC, 
FlinkSqlOperatorTable.DESC);
+
+               // crypto hash
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.MD5, 
FlinkSqlOperatorTable.MD5);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SHA2, 
FlinkSqlOperatorTable.SHA2);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SHA224, 
FlinkSqlOperatorTable.SHA224);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SHA256, 
FlinkSqlOperatorTable.SHA256);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SHA384, 
FlinkSqlOperatorTable.SHA384);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SHA512, 
FlinkSqlOperatorTable.SHA512);
+               
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(BuiltInFunctionDefinitions.SHA1, 
FlinkSqlOperatorTable.SHA1);
+
+               // etc
+               // 
SIMPLE_DEF_SQL_OPERATOR_MAPPING.put(InternalFunctionDefinitions.THROW_EXCEPTION,
 FlinkSqlOperatorTable.THROW_EXCEPTION);
+
+       }
+
        public RexNodeConverter(RelBuilder relBuilder) {
                this.relBuilder = relBuilder;
                this.typeFactory = (FlinkTypeFactory) 
relBuilder.getRexBuilder().getTypeFactory();
        }
 
+       // TODO removed later after PlanExpression is merged
        public RexNode visit(UnresolvedCallExpression call) {
-               switch (call.getFunctionDefinition().getKind()) {
+               FunctionDefinition func = call.getFunctionDefinition();
+               switch (func.getKind()) {
                        case SCALAR:
-                               return 
translateScalarCall(call.getFunctionDefinition(), call.getChildren());
-                       default: throw new UnsupportedOperationException();
+                               if (func instanceof ScalarFunctionDefinition) {
+                                       ScalarFunction scalaFunc = 
((ScalarFunctionDefinition) func).getScalarFunction();
+                                       List<RexNode> child = 
convertCallChildren(call.getChildren());
+                                       SqlFunction sqlFunction = 
UserDefinedFunctionUtils.createScalarSqlFunction(
+                                                       
scalaFunc.functionIdentifier(),
+                                                       scalaFunc.toString(),
+                                                       scalaFunc,
+                                                       typeFactory);
+                                       return relBuilder.call(sqlFunction, 
child);
+                               } else {
+                                       return 
visitBuiltInFunc(call.getFunctionDefinition(), call.getChildren());
+                               }
+
+                       default:
+                               throw new UnsupportedOperationException();
                }
        }
 
        @Override
        public RexNode visit(CallExpression call) {
-               switch (call.getFunctionDefinition().getKind()) {
-                       case SCALAR:
-                               return 
translateScalarCall(call.getFunctionDefinition(), call.getChildren());
-                       default: throw new UnsupportedOperationException();
+               FunctionDefinition func = call.getFunctionDefinition();
+               if (func instanceof ScalarFunctionDefinition) {
+                       ScalarFunction scalaFunc = ((ScalarFunctionDefinition) 
func).getScalarFunction();
+                       List<RexNode> child = 
convertCallChildren(call.getChildren());
+                       SqlFunction sqlFunction = 
UserDefinedFunctionUtils.createScalarSqlFunction(
+                                       scalaFunc.functionIdentifier(),
+                                       scalaFunc.toString(),
+                                       scalaFunc,
+                                       typeFactory);
+                       return relBuilder.call(sqlFunction, child);
+               } else if (func instanceof TableFunctionDefinition) {
+                       throw new 
UnsupportedOperationException(func.toString());
 
 Review comment:
   Why not?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to