xuyangzhong commented on code in PR #24183:
URL: https://github.com/apache/flink/pull/24183#discussion_r1467163962


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/FunctionITCase.java:
##########
@@ -1073,6 +1073,25 @@ void testNamedArgumentsTableFunction() throws Exception {
         
assertThat(TestCollectionTableFactory.getResult()).containsExactlyInAnyOrder(sinkData);
     }
 
+    @Test
+    void testNamedArgumentsCatalogTableFunctionWithOptionalArguments() throws 
Exception {

Review Comment:
   nit: remove `Catalog`



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionTemplate.java:
##########
@@ -194,14 +194,17 @@ private static <T, H extends Annotation> T defaultAsNull(
                     "Argument and input hints cannot be declared in the same 
function hint.");
         }
 
+        Boolean[] argumentOptionals = null;

Review Comment:
   Just a little curious, will there be a null Boolean value as element?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/StringCallGen.scala:
##########
@@ -237,6 +238,9 @@ object StringCallGen {
         val currentDatabase = ctx.addReusableQueryLevelCurrentDatabase()
         generateNonNullField(returnType, currentDatabase)
 
+      case DEFAULT =>

Review Comment:
   The class `StringCallGen` contains all the functions that take `StringData` 
as input or output parameters. It's not quite appropriate to place the function 
`DEFAULT` here. 
   How about we directly put it into the 
`ExprCodeGenerator#generateCallExpression` and add `public static final 
SqlSpecialOperator DEFAULT = SqlStdOperatorTable.DEFAULT;` in 
`FlinkSqlOperatorTable`? 



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/FunctionSignatureTemplate.java:
##########
@@ -67,7 +73,24 @@ static FunctionSignatureTemplate of(
             throw extractionError(
                     "Argument name conflict, there are at least two argument 
names that are the same.");
         }
-        return new FunctionSignatureTemplate(argumentTemplates, isVarArgs, 
argumentNames);
+        if (argumentOptionals != null && argumentOptionals.length != 
argumentTemplates.size()) {
+            throw extractionError(
+                    "Mismatch between number of argument optionals '%s' and 
argument types '%s'.",
+                    argumentOptionals.length, argumentTemplates.size());
+        }
+        if (argumentOptionals != null) {
+            for (int i = 0; i < argumentTemplates.size(); i++) {
+                DataType dataType = argumentTemplates.get(i).dataType;
+                if (dataType != null
+                        && !dataType.getLogicalType().isNullable()

Review Comment:
   Nice validation!



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java:
##########
@@ -116,7 +127,17 @@ public Consistency getConsistency() {
 
     @Override
     public boolean isOptional(int i) {
-        return false;
+        Optional<List<Boolean>> optionalArguments = 
typeInference.getOptionalArguments();
+        if (optionalArguments.isPresent()) {
+            return optionalArguments.get().get(i);

Review Comment:
   Nit: Will a NullPointerException occur here when converting from Boolean to 
boolean?



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java:
##########
@@ -116,7 +127,17 @@ public Consistency getConsistency() {
 
     @Override
     public boolean isOptional(int i) {
-        return false;
+        Optional<List<Boolean>> optionalArguments = 
typeInference.getOptionalArguments();
+        if (optionalArguments.isPresent()) {
+            return optionalArguments.get().get(i);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public boolean isFixedParameters() {
+        return typeInference.getTypedArguments().isPresent();

Review Comment:
   As far as the current situation is concerned, this code is also fine. (I'm 
wondering if it's possible to use the `isVarArg` on `FunctionHint`? )



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlProcedureCallConverter.java:
##########
@@ -70,13 +73,30 @@ public Operation convertSqlNode(SqlNode sqlNode, 
ConvertContext context) {
 
         SqlCallBinding sqlCallBinding =
                 new SqlCallBinding(context.getSqlValidator(), null, 
callProcedure);
+
+        TypeInference typeInference =
+                procedureDefinition.getTypeInference(
+                        context.getCatalogManager().getDataTypeFactory());
+        List<RexNode> reducedOperands = 
reduceOperands(sqlCallBinding.operands(), context);

Review Comment:
   If we union the logic about correcting the return type of `DEFAULT`, are 
these codes in this class not necessary to be modified? 



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -653,6 +655,22 @@ object FlinkRexUtil {
       rexBuilder,
       converter);
   }
+
+  def extractDefaultTypes(call: SqlCall, validator: SqlValidator): 
Array[RelDataType] = {
+    if (!hasAssignment(call)) {
+      return Array.empty
+    }
+
+    call.getOperator.getOperandTypeChecker
+      .asInstanceOf[SqlOperandMetadata]
+      .paramTypes(validator.getTypeFactory)
+      .asScala
+      .toArray
+  }
+
+  def hasAssignment(call: SqlCall): Boolean = {

Review Comment:
   Ditto. BTW, what about noting that this function is copied from 
`SqlCallBinding#hasAssignment`



##########
flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java:
##########
@@ -6101,6 +6105,13 @@ private void translateAgg(
                         }
                     }
                     RexNode convertedExpr = bb.convertExpression(operand);
+                    if (SqlKind.DEFAULT == convertedExpr.getKind()) {
+                        convertedExpr =
+                                ((RexCall) convertedExpr)
+                                        .clone(
+                                                defaultTypes[index],
+                                                ((RexCall) 
convertedExpr).operands);
+                    }

Review Comment:
   Nit: update `// ----- FLINK MODIFICATION END -----`



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -653,6 +655,22 @@ object FlinkRexUtil {
       rexBuilder,
       converter);
   }
+
+  def extractDefaultTypes(call: SqlCall, validator: SqlValidator): 
Array[RelDataType] = {
+    if (!hasAssignment(call)) {
+      return Array.empty
+    }
+
+    call.getOperator.getOperandTypeChecker
+      .asInstanceOf[SqlOperandMetadata]
+      .paramTypes(validator.getTypeFactory)
+      .asScala
+      .toArray
+  }
+
+  def hasAssignment(call: SqlCall): Boolean = {

Review Comment:
   Mark as private



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ExprCodeGenerator.scala:
##########
@@ -465,20 +467,38 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, 
nullableInput: Boolean)
         call.getOperands.get(1).asInstanceOf[RexLiteral])
     }
 
+    // replace default node with right type.
+    val operands = new util.ArrayList[RexNode](call.operands)
+
     // convert operands and help giving untyped NULL literals a type
-    val operands = call.getOperands.zipWithIndex.map {
+    val expressions = call.getOperands.zipWithIndex.map {
 
       // this helps e.g. for AS(null)
       // we might need to extend this logic in case some rules do not create 
typed NULLs
       case (operandLiteral: RexLiteral, 0)
           if operandLiteral.getType.getSqlTypeName == SqlTypeName.NULL &&
             call.getOperator.getReturnTypeInference == ReturnTypes.ARG0 =>
         generateNullLiteral(resultType)
-
+      case (rexCall: RexCall, i)
+          if (rexCall.getKind == SqlKind.DEFAULT && call.getOperator
+            .isInstanceOf[BridgingSqlFunction]) => {
+        val sqlFunction = call.getOperator.asInstanceOf[BridgingSqlFunction]
+        val typeInference = sqlFunction.getTypeInference
+        val typeFactory = sqlFunction.getTypeFactory
+        if (typeInference.getTypedArguments.isPresent) {
+          val dataType = 
typeInference.getTypedArguments.get().get(i).getLogicalType
+          operands.set(
+            i,
+            
rexCall.clone(typeFactory.createFieldTypeFromLogicalType(dataType), 
rexCall.operands))

Review Comment:
   The return type of the DEFAULT function has been overridden both in SqlToRel 
and here. This seems a bit too scattered. Can we consolidate this to just the 
SqlToRel phase?



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/FlinkRexUtil.scala:
##########
@@ -653,6 +655,22 @@ object FlinkRexUtil {
       rexBuilder,
       converter);
   }
+
+  def extractDefaultTypes(call: SqlCall, validator: SqlValidator): 
Array[RelDataType] = {

Review Comment:
   Nit: add some comments on this function.



##########
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/TypeInferenceOperandChecker.java:
##########
@@ -101,7 +104,15 @@ public boolean checkOperandTypes(SqlCallBinding 
callBinding, boolean throwOnFail
 
     @Override
     public SqlOperandCountRange getOperandCountRange() {
-        return countRange;
+        if (typeInference.getOptionalArguments().isPresent()
+                && typeInference.getOptionalArguments().get().stream()
+                        .anyMatch(Boolean::booleanValue)) {
+            ArgumentCount argumentCount = ConstantArgumentCount.between(0, 
countRange.getMax());

Review Comment:
   I think the minCount should be the count of `false` in 
`typeInference.getOptionalArguments()`, right?



##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/extraction/TypeInferenceExtractorTest.java:
##########
@@ -1810,4 +1844,30 @@ public String eval(String f1, String f2) {
             return "";
         }
     }
+
+    private static class ArgumentHintScalarFunctionNotNullTypeWithOptionals 
extends ScalarFunction {
+        @FunctionHint(
+                argument = {
+                    @ArgumentHint(
+                            type = @DataTypeHint("STRING NOT NULL"),
+                            name = "f1",
+                            isOptional = true),
+                    @ArgumentHint(type = @DataTypeHint("INTEGER"), name = 
"f2", isOptional = true)
+                })
+        public String eval(String f1, Integer f2) {

Review Comment:
   What happens when a parameter that be set as `optional = true` is a 
primitive type, such as int? Can we do an early check to make the error message 
clearer, instead of potentially encountering strange errors during code 
generation or reflection?



##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/extraction/TypeInferenceExtractor.java:
##########
@@ -286,6 +305,15 @@ private static void configureTypedArguments(
 
     private static TypeStrategy translateResultTypeStrategy(
             Map<FunctionSignatureTemplate, FunctionResultTemplate> 
resultMapping) {
+        if (resultMapping.size() == 1) {

Review Comment:
   This part of code should not be modified. We should change the logic 
`argumentDataTypes#size()`  in the constructor `OperatorBindingCallContext` 
instead of here.
   ```
   this.argumentDataTypes =
                   new AbstractList<DataType>() {
                       @Override
                       public DataType get(int pos) {
                           if (binding instanceof SqlCallBinding) {
                               SqlCallBinding sqlCallBinding = (SqlCallBinding) 
binding;
                               List<SqlNode> operands = 
sqlCallBinding.operands();
                               final RelDataType relDataType =
                                       sqlCallBinding
                                               .getValidator()
                                               .deriveType(
                                                       
sqlCallBinding.getScope(), operands.get(pos));
                               final LogicalType logicalType =
                                       
FlinkTypeFactory.toLogicalType(relDataType);
                               return 
TypeConversions.fromLogicalToDataType(logicalType);
                           } else {
                               final LogicalType logicalType =
                                       
toLogicalType(binding.getOperandType(pos));
                               return fromLogicalToDataType(logicalType);
                           }
                       }
   
                       @Override
                       public int size() {
                           if (binding instanceof SqlCallBinding) {
                               return ((SqlCallBinding) 
binding).operands().size();
                           } else {
                               return binding.getOperandCount();
                           }
                       }
                   };
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to