[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-13 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3389


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-09 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r105187427
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -139,9 +143,35 @@ object UserDefinedFunctionUtils {
 s"Function class '${function.getClass.getCanonicalName}' does not 
implement at least " +
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
-} else {
-  methods
 }
+
+verifyScalaVarargsAnnotation(methods)
+methods
+  }
+
+  /**
+   * If users specified an @varargs, Scala will generate two methods 
indeed.
+   * If there does not exist corresponding varargs method of the Seq 
method,
+   * we will throw an ValidationException.
+   */
+  def verifyScalaVarargsAnnotation(methods: Array[Method]) = {
+methods.foreach(method => {
+  val signatures = method.getParameterTypes
+  if (signatures.nonEmpty &&
+signatures.last.getName.equals("scala.collection.Seq") &&
--- End diff --

Can you add a test data to `UserDefinedScalaFunctionTest` with 
```
data: testData.setField(9, Seq("Hello", "World"))
typeInfo: org.apache.flink.api.scala.createTypeInformation[Seq[String]]
test: testAllApis(
  Func16('f9),
  "Func15(f9)",
  "Func15(f9)",
  "Hello, World"
)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-09 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r105186593
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -139,9 +143,35 @@ object UserDefinedFunctionUtils {
 s"Function class '${function.getClass.getCanonicalName}' does not 
implement at least " +
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
-} else {
-  methods
 }
+
+verifyScalaVarargsAnnotation(methods)
+methods
+  }
+
+  /**
+   * If users specified an @varargs, Scala will generate two methods 
indeed.
+   * If there does not exist corresponding varargs method of the Seq 
method,
+   * we will throw an ValidationException.
+   */
+  def verifyScalaVarargsAnnotation(methods: Array[Method]) = {
+methods.foreach(method => {
+  val signatures = method.getParameterTypes
+  if (signatures.nonEmpty &&
+signatures.last.getName.equals("scala.collection.Seq") &&
--- End diff --

I think this condition is too strict. It wouldn't allow `Seq`s in functions 
which is one of the most important classes in Scala. This does not work right 
now:

```
object Func16 extends ScalarFunction {

  def eval(a: Seq[String]): String = {
a.mkString(", ")
  }
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-09 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r105182198
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/utils/UserDefinedScalarFunctions.scala
 ---
@@ -221,3 +223,18 @@ class Func13(prefix: String) extends ScalarFunction {
   }
 }
 
+object Func14 extends ScalarFunction {
+
+  @varargs
+  def eval(a: Int*): Int = {
+a.sum
+  }
+}
+
+object Func15 extends ScalarFunction {
--- End diff --

This does not work:

```
object Func15 extends ScalarFunction {

  @varargs
  def eval(a: String, b: Int*): String = {
a + b.length
  }

  def eval(a: String): String = {
a
  }
}
```

It leads to `Found multiple 'eval' methods which match the signature.`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-07 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r104614715
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -94,23 +94,28 @@ object UserDefinedFunctionUtils {
 val evalMethods = checkAndExtractEvalMethods(function)
 
 val filtered = evalMethods
-  // go over all eval methods and find one matching
-  .filter { cur =>
-  val signatures = cur.getParameterTypes
-  // match parameters of signature to actual parameters
-  (actualSignature.length >= signatures.length &&
-actualSignature.zipWithIndex.forall { case (clazz, i) =>
-  (i < signatures.length && parameterTypeEquals(clazz, 
signatures(i))) ||
-(i >= signatures.length - 1 && cur.isVarArgs &&
+  // go over all eval methods and filter out one and only one matching
+  .filter {
+case cur if !cur.isVarArgs =>
+  val signatures = cur.getParameterTypes
+  // match parameters of signature to actual par(ameters
+  actualSignature.length == signatures.length &&
+signatures.zipWithIndex.forall { case (clazz, i) =>
+  parameterTypeEquals(actualSignature(i), clazz)
+  }
+case cur if cur.isVarArgs =>
+  val signatures = cur.getParameterTypes
+  actualSignature.zipWithIndex.forall { case (clazz, i) =>
+(i < signatures.length - 1 &&
+  parameterTypeEquals(clazz, signatures(i))) ||
+(i >= signatures.length - 1 &&
   parameterTypeEquals(clazz, signatures.last.getComponentType))
-}) ||
-// match empty variable arguments
-(actualSignature.length == signatures.length - 1 && cur.isVarArgs)
+  } || (actualSignature.isEmpty && signatures.length == 1)
--- End diff --

move this to a new line 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-07 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r104614583
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -94,23 +94,28 @@ object UserDefinedFunctionUtils {
 val evalMethods = checkAndExtractEvalMethods(function)
 
 val filtered = evalMethods
-  // go over all eval methods and find one matching
-  .filter { cur =>
-  val signatures = cur.getParameterTypes
-  // match parameters of signature to actual parameters
-  (actualSignature.length >= signatures.length &&
-actualSignature.zipWithIndex.forall { case (clazz, i) =>
-  (i < signatures.length && parameterTypeEquals(clazz, 
signatures(i))) ||
-(i >= signatures.length - 1 && cur.isVarArgs &&
+  // go over all eval methods and filter out one and only one matching
+  .filter {
+case cur if !cur.isVarArgs =>
+  val signatures = cur.getParameterTypes
+  // match parameters of signature to actual par(ameters
+  actualSignature.length == signatures.length &&
+signatures.zipWithIndex.forall { case (clazz, i) =>
+  parameterTypeEquals(actualSignature(i), clazz)
+  }
+case cur if cur.isVarArgs =>
+  val signatures = cur.getParameterTypes
+  actualSignature.zipWithIndex.forall { case (clazz, i) =>
+(i < signatures.length - 1 &&
--- End diff --

we can also move the condition check about `i` into case pattern to make 
the codes more clear, like:
```
case (clazz, i) if (i < signatures.length - 1 ) =>
// ...
case (clazz, i) if (i >= signatures.length - 1) =>
// ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-07 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r104609713
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 ---
@@ -44,14 +44,24 @@ class ScalarFunctionCallGen(
   operands: Seq[GeneratedExpression])
 : GeneratedExpression = {
 // determine function signature and result class
-val matchingSignature = getSignature(scalarFunction, signature)
+val matchingMethod = getEvalMethod(scalarFunction, signature)
   .getOrElse(throw new CodeGenException("No matching signature 
found."))
+val matchingSignature = matchingMethod.getParameterTypes
 val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
 
+// zip for variable signatures
+var paramToOperands = matchingSignature.zip(operands)
+var i = paramToOperands.length
+while (i < operands.length
--- End diff --

Sure. I think there will be something similar in 
https://github.com/apache/flink/pull/3407 (FLINK-5882)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-06 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r104605350
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,26 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty &&
+signatures.last.getName.equals("scala.collection.Seq") &&
+// If users specified an @varargs, Scala will generate two 
methods indeed.
+// If there does not exists corresponding varargs method of 
the Seq method,
--- End diff --

Typo.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-06 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r104600115
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 ---
@@ -44,14 +44,24 @@ class ScalarFunctionCallGen(
   operands: Seq[GeneratedExpression])
 : GeneratedExpression = {
 // determine function signature and result class
-val matchingSignature = getSignature(scalarFunction, signature)
+val matchingMethod = getEvalMethod(scalarFunction, signature)
   .getOrElse(throw new CodeGenException("No matching signature 
found."))
+val matchingSignature = matchingMethod.getParameterTypes
 val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
 
+// zip for variable signatures
+var paramToOperands = matchingSignature.zip(operands)
+var i = paramToOperands.length
+while (i < operands.length
--- End diff --

Forget zipAll, i think you can write codes like this instead of a while loop
```
if (operands.length > matchingSignature.length) {
  operands.drop(matchingSignature.length).foreach(op =>
paramToOperands = paramToOperands :+ 
(matchingSignature.last.getComponentType, op))
}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-06 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r104597703
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 ---
@@ -44,14 +44,24 @@ class ScalarFunctionCallGen(
   operands: Seq[GeneratedExpression])
 : GeneratedExpression = {
 // determine function signature and result class
-val matchingSignature = getSignature(scalarFunction, signature)
+val matchingMethod = getEvalMethod(scalarFunction, signature)
   .getOrElse(throw new CodeGenException("No matching signature 
found."))
+val matchingSignature = matchingMethod.getParameterTypes
 val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
 
+// zip for variable signatures
+var paramToOperands = matchingSignature.zip(operands)
+var i = paramToOperands.length
+while (i < operands.length
--- End diff --

I think this while loop can be replaced by calling `zipAll`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-06 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r104595467
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -136,8 +143,18 @@ object ScalarSqlFunction {
   }
 
   override def getOperandCountRange: SqlOperandCountRange = {
-val signatureLengths = signatures.map(_.length)
-SqlOperandCountRanges.between(signatureLengths.min, 
signatureLengths.max)
+var min = 255
+var max = -1
+signatures.foreach(sig => {
+  var len = sig.length
+  if (len > 0 && sig(sig.length -1).isArray) {
--- End diff --

please add space before 1


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-06 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r104584170
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,26 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  methods.foreach(method => {
--- End diff --

Can we move this check to a method like verifyScalaVarargsAnnotation ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-06 Thread KurtYoung
Github user KurtYoung commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r104581688
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,26 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty &&
+signatures.last.getName.equals("scala.collection.Seq") &&
+// If users specified an @varargs, Scala will generate two 
methods indeed.
+// If there does not exists corresponding varargs method of 
the Seq method,
--- End diff --

If there does not exists corresponding varargs annotation of the eval method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-01 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r103632353
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,26 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty &&
+signatures.last.getName.equals("scala.collection.Seq") &&
+// If users specified an @varargs, Scala will generate two 
methods indeed.
+// If there does not exists corresponding varargs method of 
the Seq method,
+// we will throw an ValidationException.
+(!methods.exists(m => {
+  val sigs = m.getParameterTypes
+  m.isVarArgs &&
+  sigs.length == signatures.length &&
+  sigs.zipWithIndex.forall { case (sig, i) =>
+ i == sigs.length - 1 || sig.equals(signatures(i))
+  }
+}))) {
+  throw new ValidationException("The 'eval' method do not support 
Scala type of " +
--- End diff --

I've updated the patch and refactored the logic.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-03-01 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r103632469
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -112,9 +112,16 @@ object ScalarSqlFunction {
   .getParameterTypes(foundSignature)
   .map(typeFactory.createTypeFromTypeInfo)
 
-inferredTypes.zipWithIndex.foreach {
-  case (inferredType, i) =>
-operandTypes(i) = inferredType
+operandTypes.zipWithIndex.foreach {
+  case (_, i) =>
+if (i < inferredTypes.length - 1) {
+  operandTypes(i) = inferredTypes(i)
+} else if (null != inferredTypes.last.getComponentType) {
+  // last arguments is a collection, the array type
+  operandTypes(i) = inferredTypes.last.getComponentType
+} else {
+  operandTypes(i) = inferredTypes.last
+}
--- End diff --

The logic here has also been changed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-27 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r103369181
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,24 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var noVargs = true
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  if (method.isVarArgs) {
+noVargs = false
+  } else if 
(signatures.last.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  }
+}
+  })
+  if (trailingSeq && noVargs) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
--- End diff --

I think one of the important thing here is to check type by type.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r103361639
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,24 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var noVargs = true
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  if (method.isVarArgs) {
+noVargs = false
+  } else if 
(signatures.last.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  }
+}
+  })
+  if (trailingSeq && noVargs) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
--- End diff --

If there is multiple eval methods found (not ambiguous), one is varargs, 
the other is not. It seems that no exception is thrown to tell users that the 
non-varargs eval method is not work.

```
@varargs
def eval(args: String*): String = {...}
// no varargs annotation
def eval(args: Int*): Int = {...}
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-27 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r103366163
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -114,7 +114,9 @@ object ScalarSqlFunction {
 
 inferredTypes.zipWithIndex.foreach {
   case (inferredType, i) =>
-operandTypes(i) = inferredType
+if (operandTypes.length > 0) {
+  operandTypes(i) = inferredType
--- End diff --

If this is a varargs method, the inferredType is an array type. The operand 
type should be the component type of the array type, not the array type. 

And the `operandTypes.length > 0` condition is still not safe, say the 
method is `eval(String a, int... b)`  and calling `eval("hello")`, the 
`operandTypes`'s length is 1, but `inferredTypes`'s length is 2. An 
IndexOutOfBoundsException would be thrown as before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-23 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102864763
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +138,24 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var noVargs = true
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  if (method.isVarArgs) {
+noVargs = false
+  } else if 
(signatures.last.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  }
+}
+  })
+  if (trailingSeq && noVargs) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
--- End diff --

This is correct. Because if there is multiple methods found (override), it 
will throw another exception.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-23 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102865002
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -78,20 +78,7 @@ object UserDefinedFunctionUtils {
   function: UserDefinedFunction,
   signature: Seq[TypeInformation[_]])
 : Option[Array[Class[_]]] = {
-// We compare the raw Java classes not the TypeInformation.
-// TypeInformation does not matter during runtime (e.g. within a 
MapFunction).
-val actualSignature = typeInfoToClass(signature)
-val signatures = getSignatures(function)
-
-signatures
-  // go over all signatures and find one matching actual signature
-  .find { curSig =>
-  // match parameters of signature to actual parameters
-  actualSignature.length == curSig.length &&
-curSig.zipWithIndex.forall { case (clazz, i) =>
-  parameterTypeEquals(actualSignature(i), clazz)
-}
-}
--- End diff --

I deleted them, because both methods are simply copy and paste. One was 
used for ScalarFunction, the other was used for TableFunction.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102632219
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 ---
@@ -48,10 +48,16 @@ class ScalarFunctionCallGen(
   .getOrElse(throw new CodeGenException("No matching signature 
found."))
 val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
 
+// zip for variable signatures
+var paramToOperands = matchingSignature.zip(operands)
+var i = paramToOperands.length
+while (i < operands.length) {
+  paramToOperands = paramToOperands :+ (matchingSignature.head, 
operands(i))
--- End diff --

I will try to use `getComponentType()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102626945
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 ---
@@ -48,10 +48,16 @@ class ScalarFunctionCallGen(
   .getOrElse(throw new CodeGenException("No matching signature 
found."))
 val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
 
+// zip for variable signatures
+var paramToOperands = matchingSignature.zip(operands)
+var i = paramToOperands.length
+while (i < operands.length) {
+  paramToOperands = paramToOperands :+ (matchingSignature.head, 
operands(i))
--- End diff --

Thanks, @wuchong .Yes. It's a mistake here. And tests haven't covered this 
situation. Since the max number of the arguments is 254. I don't think it is 
necessary to use a component type at the phase of code generation. I will try 
to add some tests to cover this situation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102628732
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
 ---
@@ -181,6 +181,22 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
   }
   
   @Test
+  def testVariableArgs(): Unit = {
+testAllApis(
--- End diff --

Sure. Thanks @wuchong . As we discussed above, I will add more tests here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102628111
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +147,25 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var trailingArray = false
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  val trailingArg = signatures(signatures.length - 1)
+  if (trailingArg.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  } else if (trailingArg.isArray) {
+trailingArray = true
+  }
+}
+  })
+  if (trailingSeq && !trailingArray) {
--- End diff --

If the users use the annotation `@scala.annotation.varargs`, Scala will 
generate two signatures of the method. One is `T eval(scala.collection.Seq 
args)`, the other is `T eval(T[] args)`. A better idea is to compare every 
arguments of the signature. We can make sure either there is only one method `T 
eval(T[] args)`, or there are two methods: `T eval(scala.collection.Seq 
args)` and `T eval(T[] args)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102627293
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
   // go over all signatures and find one matching actual signature
   .find { curSig =>
   // match parameters of signature to actual parameters
-  actualSignature.length == curSig.length &&
+  (actualSignature.length == curSig.length &&
 curSig.zipWithIndex.forall { case (clazz, i) =>
   parameterTypeEquals(actualSignature(i), clazz)
-}
+}) ||
+// matching the style which last argument is variable, eg. 
"Type..." "Type*"
+(actualSignature.length >= curSig.length &&
--- End diff --

Thanks @wuchong . I will do some tests and revisions to handle this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102627751
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
   // go over all signatures and find one matching actual signature
   .find { curSig =>
   // match parameters of signature to actual parameters
-  actualSignature.length == curSig.length &&
+  (actualSignature.length == curSig.length &&
 curSig.zipWithIndex.forall { case (clazz, i) =>
   parameterTypeEquals(actualSignature(i), clazz)
-}
+}) ||
+// matching the style which last argument is variable, eg. 
"Type..." "Type*"
+(actualSignature.length >= curSig.length &&
+  curSig.zipWithIndex.forall { case (clazz, i) =>
+  parameterTypeEquals(actualSignature(i), clazz) ||
+(i == curSig.length - 1 && clazz.isArray)
--- End diff --

Thanks @wuchong. They are good suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102628665
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +147,25 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var trailingArray = false
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  val trailingArg = signatures(signatures.length - 1)
+  if (trailingArg.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  } else if (trailingArg.isArray) {
+trailingArray = true
+  }
+}
+  })
+  if (trailingSeq && !trailingArray) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
+  "variable args eg. scala.collection.Seq or Type*, please add a 
@varargs annotation " +
--- End diff --

Sure. Thanks @wuchong 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
Github user clarkyzl commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102483144
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -136,8 +136,18 @@ object ScalarSqlFunction {
   }
 
   override def getOperandCountRange: SqlOperandCountRange = {
-val signatureLengths = signatures.map(_.length)
-SqlOperandCountRanges.between(signatureLengths.min, 
signatureLengths.max)
+var min = 255
+var max = -1
+signatures.foreach(sig => {
+  var len = sig.length
+  if (len > 0 && sig(sig.length -1).isArray) {
+max = 254  // according to JVM spec 4.3.3
--- End diff --

Hi @wuchong . Yes, according to JVM specification. It may have 255 
parameters, if the method is static. If the method is not static, the pointer 
"this" will be one of the parameters, so it is 254.

FYI, 
http://docs.oracle.com/javase/specs/jvms/se7/html/jvms-4.html#jvms-4.3.3


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102448630
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/UserDefinedScalarFunctionTest.scala
 ---
@@ -181,6 +181,22 @@ class UserDefinedScalarFunctionTest extends 
ExpressionTestBase {
   }
   
   @Test
+  def testVariableArgs(): Unit = {
+testAllApis(
--- End diff --

Can you add a test to check whether zero-param can pass ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102475557
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarFunctionCallGen.scala
 ---
@@ -48,10 +48,16 @@ class ScalarFunctionCallGen(
   .getOrElse(throw new CodeGenException("No matching signature 
found."))
 val resultClass = getResultTypeClass(scalarFunction, matchingSignature)
 
+// zip for variable signatures
+var paramToOperands = matchingSignature.zip(operands)
+var i = paramToOperands.length
+while (i < operands.length) {
+  paramToOperands = paramToOperands :+ (matchingSignature.head, 
operands(i))
--- End diff --

Why take head of `matchingSignature`? Isn't the vararg the last one of 
signatures ? 

And I think we should take the component type of the vararg array type to 
be the operand class. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102464697
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
   // go over all signatures and find one matching actual signature
   .find { curSig =>
   // match parameters of signature to actual parameters
-  actualSignature.length == curSig.length &&
+  (actualSignature.length == curSig.length &&
 curSig.zipWithIndex.forall { case (clazz, i) =>
   parameterTypeEquals(actualSignature(i), clazz)
-}
+}) ||
+// matching the style which last argument is variable, eg. 
"Type..." "Type*"
+(actualSignature.length >= curSig.length &&
--- End diff --

varargs can be empty, so `actualSignature.length` could be `curSig.length - 
1`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102472512
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +147,25 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var trailingArray = false
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  val trailingArg = signatures(signatures.length - 1)
+  if (trailingArg.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  } else if (trailingArg.isArray) {
+trailingArray = true
+  }
+}
+  })
+  if (trailingSeq && !trailingArray) {
--- End diff --

The `eval` method can be overloaded. Such as : 
```scala
def eval(args: Array[Int]): Int = {}
def eval(args: Int*): Int = {}
```
In this case, the methods can satisfy the condition, but the exception 
should be thrown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102448156
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -140,6 +147,25 @@ object UserDefinedFunctionUtils {
   s"one method named 'eval' which is public, not abstract and " +
   s"(in case of table functions) not static.")
 } else {
+  var trailingSeq = false
+  var trailingArray = false
+  methods.foreach(method => {
+val signatures = method.getParameterTypes
+if (signatures.nonEmpty) {
+  val trailingArg = signatures(signatures.length - 1)
+  if (trailingArg.getName.equals("scala.collection.Seq")) {
+trailingSeq = true
+  } else if (trailingArg.isArray) {
+trailingArray = true
+  }
+}
+  })
+  if (trailingSeq && !trailingArray) {
+// We found trailing "scala.collection.Seq", but no trailing 
"Type[]", "Type..."
+throw new ValidationException("The 'eval' method do not support 
Scala type of " +
+  "variable args eg. scala.collection.Seq or Type*, please add a 
@varargs annotation " +
--- End diff --

change `@varargs` to `@scala.annotation.varargs` will be more clear ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102453140
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -136,8 +136,18 @@ object ScalarSqlFunction {
   }
 
   override def getOperandCountRange: SqlOperandCountRange = {
-val signatureLengths = signatures.map(_.length)
-SqlOperandCountRanges.between(signatureLengths.min, 
signatureLengths.max)
+var min = 255
+var max = -1
+signatures.foreach(sig => {
+  var len = sig.length
+  if (len > 0 && sig(sig.length -1).isArray) {
+max = 254  // according to JVM spec 4.3.3
--- End diff --

The varargs can only have 254 parameters ? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102470461
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
   // go over all signatures and find one matching actual signature
   .find { curSig =>
   // match parameters of signature to actual parameters
-  actualSignature.length == curSig.length &&
+  (actualSignature.length == curSig.length &&
 curSig.zipWithIndex.forall { case (clazz, i) =>
   parameterTypeEquals(actualSignature(i), clazz)
-}
+}) ||
+// matching the style which last argument is variable, eg. 
"Type..." "Type*"
+(actualSignature.length >= curSig.length &&
+  curSig.zipWithIndex.forall { case (clazz, i) =>
+  parameterTypeEquals(actualSignature(i), clazz) ||
+(i == curSig.length - 1 && clazz.isArray)
--- End diff --

Even if the last parameter type is an array type, it is possible not a 
varargs method. Such as `public void eval(int[] arrays)` , it is not a varargs 
method but a general method, we should add a test to check this feature not 
affect general array method.

The best way to check whether a method is varargs is using 
`Method.isVarArgs()`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread wuchong
Github user wuchong commented on a diff in the pull request:

https://github.com/apache/flink/pull/3389#discussion_r102465907
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/UserDefinedFunctionUtils.scala
 ---
@@ -87,10 +87,16 @@ object UserDefinedFunctionUtils {
   // go over all signatures and find one matching actual signature
   .find { curSig =>
   // match parameters of signature to actual parameters
-  actualSignature.length == curSig.length &&
+  (actualSignature.length == curSig.length &&
 curSig.zipWithIndex.forall { case (clazz, i) =>
   parameterTypeEquals(actualSignature(i), clazz)
-}
+}) ||
+// matching the style which last argument is variable, eg. 
"Type..." "Type*"
+(actualSignature.length >= curSig.length &&
+  curSig.zipWithIndex.forall { case (clazz, i) =>
+  parameterTypeEquals(actualSignature(i), clazz) ||
+(i == curSig.length - 1 && clazz.isArray)
--- End diff --

We should make sure that the component type of the array class is equal to 
all of the types of the last `actualSignature .length - curSig.length + 1` 
actual parameters.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #3389: [FLINK-5881] [table] ScalarFunction(UDF) should su...

2017-02-22 Thread clarkyzl
GitHub user clarkyzl opened a pull request:

https://github.com/apache/flink/pull/3389

[FLINK-5881] [table] ScalarFunction(UDF) should support variable types and 
variable arguments

Type: New Feature
Priority: Major
Components: table, udf, ScalarFunction
Problem Definition: [FLINK-5881] [table] ScalarFunction(UDF) should support 
variable types and variable arguments
Design:
1. Modified the getSignature() method in UserDefinedFunctionUtils, made 
trailing style of variable arguments can be found. The "(TypeA a, Type B b, 
TypeC... c)" or "(a: TypeA, b: TypeB, c: TypeC*)" with annotation will pass the 
method.
2. Modified the SqlOperandTypeChecker, made the count range of sql operands 
flexible. So it will pass the sql node validation of calcite.
3. Modified the checkAndExtractEvalMethods() method, and throw a human 
readable VaidataionException if the user specified the variable arguments in 
Scala and forgot to add the "@varargs" annotation. Please see the discussion in 
FLINK-5826. 
Impact Analysis:
It's a minor modification and it's a new feature. It impacts minimal in UDF.
Test:
Added both scala tests and java tests for all apis.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/clarkyzl/flink flink-5881

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/3389.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3389


commit 60b68fdd66f8021f6f090e7372987d43362d5ef3
Author: Zhuoluo Yang 
Date:   2017-02-22T10:53:34Z

[FLINK-5881] [table] ScalarFunction(UDF) should support variable types and 
variable arguments




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---