[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3389 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r105187427 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -139,9 +143,35 @@ object UserDefinedFunctionUtils { s"Function class '${function.getClass.getCanonicalName}' does not implement at least " + s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") -} else { - methods } + +verifyScalaVarargsAnnotation(methods) +methods + } + + /** + * If users specified an @varargs, Scala will generate two methods indeed. + * If there does not exist corresponding varargs method of the Seq method, + * we will throw an ValidationException. + */ + def verifyScalaVarargsAnnotation(methods: Array[Method]) = { +methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty && +signatures.last.getName.equals("scala.collection.Seq") && --- End diff -- Can you add a test data to `UserDefinedScalaFunctionTest` with ``` data: testData.setField(9, Seq("Hello", "World")) typeInfo: org.apache.flink.api.scala.createTypeInformation[Seq[String]] test: testAllApis( Func16('f9), "Func15(f9)", "Func15(f9)", "Hello, World" ) ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r105186593 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -139,9 +143,35 @@ object UserDefinedFunctionUtils { s"Function class '${function.getClass.getCanonicalName}' does not implement at least " + s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") -} else { - methods } + +verifyScalaVarargsAnnotation(methods) +methods + } + + /** + * If users specified an @varargs, Scala will generate two methods indeed. + * If there does not exist corresponding varargs method of the Seq method, + * we will throw an ValidationException. + */ + def verifyScalaVarargsAnnotation(methods: Array[Method]) = { +methods.foreach(method => { + val signatures = method.getParameterTypes + if (signatures.nonEmpty && +signatures.last.getName.equals("scala.collection.Seq") && --- End diff -- I think this condition is too strict. It wouldn't allow `Seq`s in functions which is one of the most important classes in Scala. This does not work right now: ``` object Func16 extends ScalarFunction { def eval(a: Seq[String]): String = { a.mkString(", ") } } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r105182198 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala --- @@ -221,3 +223,18 @@ class Func13(prefix: String) extends ScalarFunction { } } +object Func14 extends ScalarFunction { + + @varargs + def eval(a: Int*): Int = { +a.sum + } +} + +object Func15 extends ScalarFunction { --- End diff -- This does not work: ``` object Func15 extends ScalarFunction { @varargs def eval(a: String, b: Int*): String = { a + b.length } def eval(a: String): String = { a } } ``` It leads to `Found multiple 'eval' methods which match the signature.` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104614715 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -94,23 +94,28 @@ object UserDefinedFunctionUtils { val evalMethods = checkAndExtractEvalMethods(function) val filtered = evalMethods - // go over all eval methods and find one matching - .filter { cur => - val signatures = cur.getParameterTypes - // match parameters of signature to actual parameters - (actualSignature.length >= signatures.length && -actualSignature.zipWithIndex.forall { case (clazz, i) => - (i < signatures.length && parameterTypeEquals(clazz, signatures(i))) || -(i >= signatures.length - 1 && cur.isVarArgs && + // go over all eval methods and filter out one and only one matching + .filter { +case cur if !cur.isVarArgs => + val signatures = cur.getParameterTypes + // match parameters of signature to actual par(ameters + actualSignature.length == signatures.length && +signatures.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature(i), clazz) + } +case cur if cur.isVarArgs => + val signatures = cur.getParameterTypes + actualSignature.zipWithIndex.forall { case (clazz, i) => +(i < signatures.length - 1 && + parameterTypeEquals(clazz, signatures(i))) || +(i >= signatures.length - 1 && parameterTypeEquals(clazz, signatures.last.getComponentType)) -}) || -// match empty variable arguments -(actualSignature.length == signatures.length - 1 && cur.isVarArgs) + } || (actualSignature.isEmpty && signatures.length == 1) --- End diff -- move this to a new line --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104614583 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -94,23 +94,28 @@ object UserDefinedFunctionUtils { val evalMethods = checkAndExtractEvalMethods(function) val filtered = evalMethods - // go over all eval methods and find one matching - .filter { cur => - val signatures = cur.getParameterTypes - // match parameters of signature to actual parameters - (actualSignature.length >= signatures.length && -actualSignature.zipWithIndex.forall { case (clazz, i) => - (i < signatures.length && parameterTypeEquals(clazz, signatures(i))) || -(i >= signatures.length - 1 && cur.isVarArgs && + // go over all eval methods and filter out one and only one matching + .filter { +case cur if !cur.isVarArgs => + val signatures = cur.getParameterTypes + // match parameters of signature to actual par(ameters + actualSignature.length == signatures.length && +signatures.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature(i), clazz) + } +case cur if cur.isVarArgs => + val signatures = cur.getParameterTypes + actualSignature.zipWithIndex.forall { case (clazz, i) => +(i < signatures.length - 1 && --- End diff -- we can also move the condition check about `i` into case pattern to make the codes more clear, like: ``` case (clazz, i) if (i < signatures.length - 1 ) => // ... case (clazz, i) if (i >= signatures.length - 1) => // ... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104609713 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala --- @@ -44,14 +44,24 @@ class ScalarFunctionCallGen( operands: Seq[GeneratedExpression]) : GeneratedExpression = { // determine function signature and result class -val matchingSignature = getSignature(scalarFunction, signature) +val matchingMethod = getEvalMethod(scalarFunction, signature) .getOrElse(throw new CodeGenException("No matching signature found.")) +val matchingSignature = matchingMethod.getParameterTypes val resultClass = getResultTypeClass(scalarFunction, matchingSignature) +// zip for variable signatures +var paramToOperands = matchingSignature.zip(operands) +var i = paramToOperands.length +while (i < operands.length --- End diff -- Sure. I think there will be something similar in https://github.com/apache/flink/pull/3407 (FLINK-5882) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104605350 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty && +signatures.last.getName.equals("scala.collection.Seq") && +// If users specified an @varargs, Scala will generate two methods indeed. +// If there does not exists corresponding varargs method of the Seq method, --- End diff -- Typo. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104600115 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala --- @@ -44,14 +44,24 @@ class ScalarFunctionCallGen( operands: Seq[GeneratedExpression]) : GeneratedExpression = { // determine function signature and result class -val matchingSignature = getSignature(scalarFunction, signature) +val matchingMethod = getEvalMethod(scalarFunction, signature) .getOrElse(throw new CodeGenException("No matching signature found.")) +val matchingSignature = matchingMethod.getParameterTypes val resultClass = getResultTypeClass(scalarFunction, matchingSignature) +// zip for variable signatures +var paramToOperands = matchingSignature.zip(operands) +var i = paramToOperands.length +while (i < operands.length --- End diff -- Forget zipAll, i think you can write codes like this instead of a while loop ``` if (operands.length > matchingSignature.length) { operands.drop(matchingSignature.length).foreach(op => paramToOperands = paramToOperands :+ (matchingSignature.last.getComponentType, op)) } ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104597703 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala --- @@ -44,14 +44,24 @@ class ScalarFunctionCallGen( operands: Seq[GeneratedExpression]) : GeneratedExpression = { // determine function signature and result class -val matchingSignature = getSignature(scalarFunction, signature) +val matchingMethod = getEvalMethod(scalarFunction, signature) .getOrElse(throw new CodeGenException("No matching signature found.")) +val matchingSignature = matchingMethod.getParameterTypes val resultClass = getResultTypeClass(scalarFunction, matchingSignature) +// zip for variable signatures +var paramToOperands = matchingSignature.zip(operands) +var i = paramToOperands.length +while (i < operands.length --- End diff -- I think this while loop can be replaced by calling `zipAll` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104595467 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -136,8 +143,18 @@ object ScalarSqlFunction { } override def getOperandCountRange: SqlOperandCountRange = { -val signatureLengths = signatures.map(_.length) -SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max) +var min = 255 +var max = -1 +signatures.foreach(sig => { + var len = sig.length + if (len > 0 && sig(sig.length -1).isArray) { --- End diff -- please add space before 1 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104584170 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + methods.foreach(method => { --- End diff -- Can we move this check to a method like verifyScalaVarargsAnnotation ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user KurtYoung commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r104581688 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty && +signatures.last.getName.equals("scala.collection.Seq") && +// If users specified an @varargs, Scala will generate two methods indeed. +// If there does not exists corresponding varargs method of the Seq method, --- End diff -- If there does not exists corresponding varargs annotation of the eval method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103632353 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,26 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty && +signatures.last.getName.equals("scala.collection.Seq") && +// If users specified an @varargs, Scala will generate two methods indeed. +// If there does not exists corresponding varargs method of the Seq method, +// we will throw an ValidationException. +(!methods.exists(m => { + val sigs = m.getParameterTypes + m.isVarArgs && + sigs.length == signatures.length && + sigs.zipWithIndex.forall { case (sig, i) => + i == sigs.length - 1 || sig.equals(signatures(i)) + } +}))) { + throw new ValidationException("The 'eval' method do not support Scala type of " + --- End diff -- I've updated the patch and refactored the logic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103632469 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -112,9 +112,16 @@ object ScalarSqlFunction { .getParameterTypes(foundSignature) .map(typeFactory.createTypeFromTypeInfo) -inferredTypes.zipWithIndex.foreach { - case (inferredType, i) => -operandTypes(i) = inferredType +operandTypes.zipWithIndex.foreach { + case (_, i) => +if (i < inferredTypes.length - 1) { + operandTypes(i) = inferredTypes(i) +} else if (null != inferredTypes.last.getComponentType) { + // last arguments is a collection, the array type + operandTypes(i) = inferredTypes.last.getComponentType +} else { + operandTypes(i) = inferredTypes.last +} --- End diff -- The logic here has also been changed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103369181 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var noVargs = true + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + if (method.isVarArgs) { +noVargs = false + } else if (signatures.last.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } +} + }) + if (trailingSeq && noVargs) { +// We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." +throw new ValidationException("The 'eval' method do not support Scala type of " + --- End diff -- I think one of the important thing here is to check type by type. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103361639 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var noVargs = true + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + if (method.isVarArgs) { +noVargs = false + } else if (signatures.last.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } +} + }) + if (trailingSeq && noVargs) { +// We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." +throw new ValidationException("The 'eval' method do not support Scala type of " + --- End diff -- If there is multiple eval methods found (not ambiguous), one is varargs, the other is not. It seems that no exception is thrown to tell users that the non-varargs eval method is not work. ``` @varargs def eval(args: String*): String = {...} // no varargs annotation def eval(args: Int*): Int = {...} ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r103366163 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -114,7 +114,9 @@ object ScalarSqlFunction { inferredTypes.zipWithIndex.foreach { case (inferredType, i) => -operandTypes(i) = inferredType +if (operandTypes.length > 0) { + operandTypes(i) = inferredType --- End diff -- If this is a varargs method, the inferredType is an array type. The operand type should be the component type of the array type, not the array type. And the `operandTypes.length > 0` condition is still not safe, say the method is `eval(String a, int... b)` and calling `eval("hello")`, the `operandTypes`'s length is 1, but `inferredTypes`'s length is 2. An IndexOutOfBoundsException would be thrown as before. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102864763 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +138,24 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var noVargs = true + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + if (method.isVarArgs) { +noVargs = false + } else if (signatures.last.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } +} + }) + if (trailingSeq && noVargs) { +// We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." +throw new ValidationException("The 'eval' method do not support Scala type of " + --- End diff -- This is correct. Because if there is multiple methods found (override), it will throw another exception. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102865002 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -78,20 +78,7 @@ object UserDefinedFunctionUtils { function: UserDefinedFunction, signature: Seq[TypeInformation[_]]) : Option[Array[Class[_]]] = { -// We compare the raw Java classes not the TypeInformation. -// TypeInformation does not matter during runtime (e.g. within a MapFunction). -val actualSignature = typeInfoToClass(signature) -val signatures = getSignatures(function) - -signatures - // go over all signatures and find one matching actual signature - .find { curSig => - // match parameters of signature to actual parameters - actualSignature.length == curSig.length && -curSig.zipWithIndex.forall { case (clazz, i) => - parameterTypeEquals(actualSignature(i), clazz) -} -} --- End diff -- I deleted them, because both methods are simply copy and paste. One was used for ScalarFunction, the other was used for TableFunction. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102632219 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala --- @@ -48,10 +48,16 @@ class ScalarFunctionCallGen( .getOrElse(throw new CodeGenException("No matching signature found.")) val resultClass = getResultTypeClass(scalarFunction, matchingSignature) +// zip for variable signatures +var paramToOperands = matchingSignature.zip(operands) +var i = paramToOperands.length +while (i < operands.length) { + paramToOperands = paramToOperands :+ (matchingSignature.head, operands(i)) --- End diff -- I will try to use `getComponentType()` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102626945 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala --- @@ -48,10 +48,16 @@ class ScalarFunctionCallGen( .getOrElse(throw new CodeGenException("No matching signature found.")) val resultClass = getResultTypeClass(scalarFunction, matchingSignature) +// zip for variable signatures +var paramToOperands = matchingSignature.zip(operands) +var i = paramToOperands.length +while (i < operands.length) { + paramToOperands = paramToOperands :+ (matchingSignature.head, operands(i)) --- End diff -- Thanks, @wuchong .Yes. It's a mistake here. And tests haven't covered this situation. Since the max number of the arguments is 254. I don't think it is necessary to use a component type at the phase of code generation. I will try to add some tests to cover this situation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102628732 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala --- @@ -181,6 +181,22 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { } @Test + def testVariableArgs(): Unit = { +testAllApis( --- End diff -- Sure. Thanks @wuchong . As we discussed above, I will add more tests here. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102628111 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var trailingArray = false + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + val trailingArg = signatures(signatures.length - 1) + if (trailingArg.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } else if (trailingArg.isArray) { +trailingArray = true + } +} + }) + if (trailingSeq && !trailingArray) { --- End diff -- If the users use the annotation `@scala.annotation.varargs`, Scala will generate two signatures of the method. One is `T eval(scala.collection.Seq args)`, the other is `T eval(T[] args)`. A better idea is to compare every arguments of the signature. We can make sure either there is only one method `T eval(T[] args)`, or there are two methods: `T eval(scala.collection.Seq args)` and `T eval(T[] args)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102627293 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils { // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters - actualSignature.length == curSig.length && + (actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) -} +}) || +// matching the style which last argument is variable, eg. "Type..." "Type*" +(actualSignature.length >= curSig.length && --- End diff -- Thanks @wuchong . I will do some tests and revisions to handle this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102627751 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils { // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters - actualSignature.length == curSig.length && + (actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) -} +}) || +// matching the style which last argument is variable, eg. "Type..." "Type*" +(actualSignature.length >= curSig.length && + curSig.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature(i), clazz) || +(i == curSig.length - 1 && clazz.isArray) --- End diff -- Thanks @wuchong. They are good suggestions. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102628665 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var trailingArray = false + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + val trailingArg = signatures(signatures.length - 1) + if (trailingArg.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } else if (trailingArg.isArray) { +trailingArray = true + } +} + }) + if (trailingSeq && !trailingArray) { +// We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." +throw new ValidationException("The 'eval' method do not support Scala type of " + + "variable args eg. scala.collection.Seq or Type*, please add a @varargs annotation " + --- End diff -- Sure. Thanks @wuchong --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user clarkyzl commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102483144 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -136,8 +136,18 @@ object ScalarSqlFunction { } override def getOperandCountRange: SqlOperandCountRange = { -val signatureLengths = signatures.map(_.length) -SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max) +var min = 255 +var max = -1 +signatures.foreach(sig => { + var len = sig.length + if (len > 0 && sig(sig.length -1).isArray) { +max = 254 // according to JVM spec 4.3.3 --- End diff -- Hi @wuchong . Yes, according to JVM specification. It may have 255 parameters, if the method is static. If the method is not static, the pointer "this" will be one of the parameters, so it is 254. FYI, http://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.3.3 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102448630 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala --- @@ -181,6 +181,22 @@ class UserDefinedScalarFunctionTest extends ExpressionTestBase { } @Test + def testVariableArgs(): Unit = { +testAllApis( --- End diff -- Can you add a test to check whether zero-param can pass ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102475557 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala --- @@ -48,10 +48,16 @@ class ScalarFunctionCallGen( .getOrElse(throw new CodeGenException("No matching signature found.")) val resultClass = getResultTypeClass(scalarFunction, matchingSignature) +// zip for variable signatures +var paramToOperands = matchingSignature.zip(operands) +var i = paramToOperands.length +while (i < operands.length) { + paramToOperands = paramToOperands :+ (matchingSignature.head, operands(i)) --- End diff -- Why take head of `matchingSignature`? Isn't the vararg the last one of signatures ? And I think we should take the component type of the vararg array type to be the operand class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102464697 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils { // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters - actualSignature.length == curSig.length && + (actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) -} +}) || +// matching the style which last argument is variable, eg. "Type..." "Type*" +(actualSignature.length >= curSig.length && --- End diff -- varargs can be empty, so `actualSignature.length` could be `curSig.length - 1`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102472512 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var trailingArray = false + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + val trailingArg = signatures(signatures.length - 1) + if (trailingArg.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } else if (trailingArg.isArray) { +trailingArray = true + } +} + }) + if (trailingSeq && !trailingArray) { --- End diff -- The `eval` method can be overloaded. Such as : ```scala def eval(args: Array[Int]): Int = {} def eval(args: Int*): Int = {} ``` In this case, the methods can satisfy the condition, but the exception should be thrown. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102448156 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -140,6 +147,25 @@ object UserDefinedFunctionUtils { s"one method named 'eval' which is public, not abstract and " + s"(in case of table functions) not static.") } else { + var trailingSeq = false + var trailingArray = false + methods.foreach(method => { +val signatures = method.getParameterTypes +if (signatures.nonEmpty) { + val trailingArg = signatures(signatures.length - 1) + if (trailingArg.getName.equals("scala.collection.Seq")) { +trailingSeq = true + } else if (trailingArg.isArray) { +trailingArray = true + } +} + }) + if (trailingSeq && !trailingArray) { +// We found trailing "scala.collection.Seq", but no trailing "Type[]", "Type..." +throw new ValidationException("The 'eval' method do not support Scala type of " + + "variable args eg. scala.collection.Seq or Type*, please add a @varargs annotation " + --- End diff -- change `@varargs` to `@scala.annotation.varargs` will be more clear ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102453140 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -136,8 +136,18 @@ object ScalarSqlFunction { } override def getOperandCountRange: SqlOperandCountRange = { -val signatureLengths = signatures.map(_.length) -SqlOperandCountRanges.between(signatureLengths.min, signatureLengths.max) +var min = 255 +var max = -1 +signatures.foreach(sig => { + var len = sig.length + if (len > 0 && sig(sig.length -1).isArray) { +max = 254 // according to JVM spec 4.3.3 --- End diff -- The varargs can only have 254 parameters ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102470461 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils { // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters - actualSignature.length == curSig.length && + (actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) -} +}) || +// matching the style which last argument is variable, eg. "Type..." "Type*" +(actualSignature.length >= curSig.length && + curSig.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature(i), clazz) || +(i == curSig.length - 1 && clazz.isArray) --- End diff -- Even if the last parameter type is an array type, it is possible not a varargs method. Such as `public void eval(int[] arrays)` , it is not a varargs method but a general method, we should add a test to check this feature not affect general array method. The best way to check whether a method is varargs is using `Method.isVarArgs()`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/3389#discussion_r102465907 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala --- @@ -87,10 +87,16 @@ object UserDefinedFunctionUtils { // go over all signatures and find one matching actual signature .find { curSig => // match parameters of signature to actual parameters - actualSignature.length == curSig.length && + (actualSignature.length == curSig.length && curSig.zipWithIndex.forall { case (clazz, i) => parameterTypeEquals(actualSignature(i), clazz) -} +}) || +// matching the style which last argument is variable, eg. "Type..." "Type*" +(actualSignature.length >= curSig.length && + curSig.zipWithIndex.forall { case (clazz, i) => + parameterTypeEquals(actualSignature(i), clazz) || +(i == curSig.length - 1 && clazz.isArray) --- End diff -- We should make sure that the component type of the array class is equal to all of the types of the last `actualSignature .length - curSig.length + 1` actual parameters. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...
GitHub user clarkyzl opened a pull request: https://github.com/apache/flink/pull/3389 [FLINK-5881] [table] ScalarFunction(UDF) should support variable types and variable arguments Type: New Feature Priority: Major Components: table, udf, ScalarFunction Problem Definition: [FLINK-5881] [table] ScalarFunction(UDF) should support variable types and variable arguments Design: 1. Modified the getSignature() method in UserDefinedFunctionUtils, made trailing style of variable arguments can be found. The "(TypeA a, Type B b, TypeC... c)" or "(a: TypeA, b: TypeB, c: TypeC*)" with annotation will pass the method. 2. Modified the SqlOperandTypeChecker, made the count range of sql operands flexible. So it will pass the sql node validation of calcite. 3. Modified the checkAndExtractEvalMethods() method, and throw a human readable VaidataionException if the user specified the variable arguments in Scala and forgot to add the "@varargs" annotation. Please see the discussion in FLINK-5826. Impact Analysis: It's a minor modification and it's a new feature. It impacts minimal in UDF. Test: Added both scala tests and java tests for all apis. You can merge this pull request into a Git repository by running: $ git pull https://github.com/clarkyzl/flink flink-5881 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3389.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3389 commit 60b68fdd66f8021f6f090e7372987d43362d5ef3 Author: Zhuoluo Yang Date: 2017-02-22T10:53:34Z [FLINK-5881] [table] ScalarFunction(UDF) should support variable types and variable arguments --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---