xuyangzhong commented on code in PR #24183: URL: https://github.com/apache/flink/pull/24183#discussion_r1467163962
########## flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java: ########## @@ -1073,6 +1073,25 @@ void testNamedArgumentsTableFunction() throws Exception { assertThat(TestCollectionTableFactory.getResult()).containsExactlyInAnyOrder(sinkData); } + @Test + void testNamedArgumentsCatalogTableFunctionWithOptionalArguments() throws Exception { Review Comment: nit: remove `Catalog` ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java: ########## @@ -194,14 +194,17 @@ private static <T, H extends Annotation> T defaultAsNull( "Argument and input hints cannot be declared in the same function hint."); } + Boolean[] argumentOptionals = null; Review Comment: Just a little curious, will there be a null Boolean value as element? ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala: ########## @@ -237,6 +238,9 @@ object StringCallGen { val currentDatabase = ctx.addReusableQueryLevelCurrentDatabase() generateNonNullField(returnType, currentDatabase) + case DEFAULT => Review Comment: The class `StringCallGen` contains all the functions that take `StringData` as input or output parameters. It's not quite appropriate to place the function `DEFAULT` here. How about we directly put it into the `ExprCodeGenerator#generateCallExpression` and add `public static final SqlSpecialOperator DEFAULT = SqlStdOperatorTable.DEFAULT;` in `FlinkSqlOperatorTable`? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java: ########## @@ -67,7 +73,24 @@ static FunctionSignatureTemplate of( throw extractionError( "Argument name conflict, there are at least two argument names that are the same."); } - return new FunctionSignatureTemplate(argumentTemplates, isVarArgs, argumentNames); + if (argumentOptionals != null && argumentOptionals.length != argumentTemplates.size()) { + throw extractionError( + "Mismatch between number of argument optionals '%s' and argument types '%s'.", + argumentOptionals.length, argumentTemplates.size()); + } + if (argumentOptionals != null) { + for (int i = 0; i < argumentTemplates.size(); i++) { + DataType dataType = argumentTemplates.get(i).dataType; + if (dataType != null + && !dataType.getLogicalType().isNullable() Review Comment: Nice validation! ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java: ########## @@ -116,7 +127,17 @@ public Consistency getConsistency() { @Override public boolean isOptional(int i) { - return false; + Optional<List<Boolean>> optionalArguments = typeInference.getOptionalArguments(); + if (optionalArguments.isPresent()) { + return optionalArguments.get().get(i); Review Comment: Nit: Will a NullPointerException occur here when converting from Boolean to boolean? ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java: ########## @@ -116,7 +127,17 @@ public Consistency getConsistency() { @Override public boolean isOptional(int i) { - return false; + Optional<List<Boolean>> optionalArguments = typeInference.getOptionalArguments(); + if (optionalArguments.isPresent()) { + return optionalArguments.get().get(i); + } else { + return false; + } + } + + @Override + public boolean isFixedParameters() { + return typeInference.getTypedArguments().isPresent(); Review Comment: As far as the current situation is concerned, this code is also fine. (I'm wondering if it's possible to use the `isVarArg` on `FunctionHint`? ) ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java: ########## @@ -70,13 +73,30 @@ public Operation convertSqlNode(SqlNode sqlNode, ConvertContext context) { SqlCallBinding sqlCallBinding = new SqlCallBinding(context.getSqlValidator(), null, callProcedure); + + TypeInference typeInference = + procedureDefinition.getTypeInference( + context.getCatalogManager().getDataTypeFactory()); + List<RexNode> reducedOperands = reduceOperands(sqlCallBinding.operands(), context); Review Comment: If we union the logic about correcting the return type of `DEFAULT`, are these codes in this class not necessary to be modified? ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala: ########## @@ -653,6 +655,22 @@ object FlinkRexUtil { rexBuilder, converter); } + + def extractDefaultTypes(call: SqlCall, validator: SqlValidator): Array[RelDataType] = { + if (!hasAssignment(call)) { + return Array.empty + } + + call.getOperator.getOperandTypeChecker + .asInstanceOf[SqlOperandMetadata] + .paramTypes(validator.getTypeFactory) + .asScala + .toArray + } + + def hasAssignment(call: SqlCall): Boolean = { Review Comment: Ditto. BTW, what about noting that this function is copied from `SqlCallBinding#hasAssignment` ########## flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java: ########## @@ -6101,6 +6105,13 @@ private void translateAgg( } } RexNode convertedExpr = bb.convertExpression(operand); + if (SqlKind.DEFAULT == convertedExpr.getKind()) { + convertedExpr = + ((RexCall) convertedExpr) + .clone( + defaultTypes[index], + ((RexCall) convertedExpr).operands); + } Review Comment: Nit: update `// ----- FLINK MODIFICATION END -----` ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala: ########## @@ -653,6 +655,22 @@ object FlinkRexUtil { rexBuilder, converter); } + + def extractDefaultTypes(call: SqlCall, validator: SqlValidator): Array[RelDataType] = { + if (!hasAssignment(call)) { + return Array.empty + } + + call.getOperator.getOperandTypeChecker + .asInstanceOf[SqlOperandMetadata] + .paramTypes(validator.getTypeFactory) + .asScala + .toArray + } + + def hasAssignment(call: SqlCall): Boolean = { Review Comment: Mark as private ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala: ########## @@ -465,20 +467,38 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean) call.getOperands.get(1).asInstanceOf[RexLiteral]) } + // replace default node with right type. + val operands = new util.ArrayList[RexNode](call.operands) + // convert operands and help giving untyped NULL literals a type - val operands = call.getOperands.zipWithIndex.map { + val expressions = call.getOperands.zipWithIndex.map { // this helps e.g. for AS(null) // we might need to extend this logic in case some rules do not create typed NULLs case (operandLiteral: RexLiteral, 0) if operandLiteral.getType.getSqlTypeName == SqlTypeName.NULL && call.getOperator.getReturnTypeInference == ReturnTypes.ARG0 => generateNullLiteral(resultType) - + case (rexCall: RexCall, i) + if (rexCall.getKind == SqlKind.DEFAULT && call.getOperator + .isInstanceOf[BridgingSqlFunction]) => { + val sqlFunction = call.getOperator.asInstanceOf[BridgingSqlFunction] + val typeInference = sqlFunction.getTypeInference + val typeFactory = sqlFunction.getTypeFactory + if (typeInference.getTypedArguments.isPresent) { + val dataType = typeInference.getTypedArguments.get().get(i).getLogicalType + operands.set( + i, + rexCall.clone(typeFactory.createFieldTypeFromLogicalType(dataType), rexCall.operands)) Review Comment: The return type of the DEFAULT function has been overridden both in SqlToRel and here. This seems a bit too scattered. Can we consolidate this to just the SqlToRel phase? ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala: ########## @@ -653,6 +655,22 @@ object FlinkRexUtil { rexBuilder, converter); } + + def extractDefaultTypes(call: SqlCall, validator: SqlValidator): Array[RelDataType] = { Review Comment: Nit: add some comments on this function. ########## flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java: ########## @@ -101,7 +104,15 @@ public boolean checkOperandTypes(SqlCallBinding callBinding, boolean throwOnFail @Override public SqlOperandCountRange getOperandCountRange() { - return countRange; + if (typeInference.getOptionalArguments().isPresent() + && typeInference.getOptionalArguments().get().stream() + .anyMatch(Boolean::booleanValue)) { + ArgumentCount argumentCount = ConstantArgumentCount.between(0, countRange.getMax()); Review Comment: I think the minCount should be the count of `false` in `typeInference.getOptionalArguments()`, right? ########## flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java: ########## @@ -1810,4 +1844,30 @@ public String eval(String f1, String f2) { return ""; } } + + private static class ArgumentHintScalarFunctionNotNullTypeWithOptionals extends ScalarFunction { + @FunctionHint( + argument = { + @ArgumentHint( + type = @DataTypeHint("STRING NOT NULL"), + name = "f1", + isOptional = true), + @ArgumentHint(type = @DataTypeHint("INTEGER"), name = "f2", isOptional = true) + }) + public String eval(String f1, Integer f2) { Review Comment: What happens when a parameter that be set as `optional = true` is a primitive type, such as int? Can we do an early check to make the error message clearer, instead of potentially encountering strange errors during code generation or reflection? ########## flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java: ########## @@ -286,6 +305,15 @@ private static void configureTypedArguments( private static TypeStrategy translateResultTypeStrategy( Map<FunctionSignatureTemplate, FunctionResultTemplate> resultMapping) { + if (resultMapping.size() == 1) { Review Comment: This part of code should not be modified. We should change the logic `argumentDataTypes#size()` in the constructor `OperatorBindingCallContext` instead of here. ``` this.argumentDataTypes = new AbstractList<DataType>() { @Override public DataType get(int pos) { if (binding instanceof SqlCallBinding) { SqlCallBinding sqlCallBinding = (SqlCallBinding) binding; List<SqlNode> operands = sqlCallBinding.operands(); final RelDataType relDataType = sqlCallBinding .getValidator() .deriveType( sqlCallBinding.getScope(), operands.get(pos)); final LogicalType logicalType = FlinkTypeFactory.toLogicalType(relDataType); return TypeConversions.fromLogicalToDataType(logicalType); } else { final LogicalType logicalType = toLogicalType(binding.getOperandType(pos)); return fromLogicalToDataType(logicalType); } } @Override public int size() { if (binding instanceof SqlCallBinding) { return ((SqlCallBinding) binding).operands().size(); } else { return binding.getOperandCount(); } } }; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org