[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...

2017-12-29 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5206


---


[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5206#discussion_r158604394
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -153,15 +154,19 @@ object ScalarSqlFunction {
   override def getOperandCountRange: SqlOperandCountRange = {
 var min = 255
 var max = -1
+var isVarargs = false
 signatures.foreach( sig => {
-  var len = sig.length
-  if (len > 0 && sig(sig.length - 1).isArray) {
-max = 254  // according to JVM spec 4.3.3
-len = sig.length - 1
+  var len = sig._2.length
+  if (len > 0 && sig._1 && sig._2(sig._2.length - 1).isArray) {
+isVarargs = true
+len = sig._2.length - 1
--- End diff --

**approach 1:**
sig._2.length - 1 => len -1 
len = sig._2.length - 1 => len-1

**approach2**
methods.foreach(
  m => {
var len = m.getParameterTypes.length
if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 
1).isArray) {
  isVarargs = true
  len = len - 1
}
max = Math.max(len, max)
min = Math.min(len, min)
  })

Using approach2 we can remove 
"val signatures = methods.map(m => m.isVarArgs -> m.getParameterTypes)"

What do you think?



---


[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5206#discussion_r158603997
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -153,15 +154,19 @@ object ScalarSqlFunction {
   override def getOperandCountRange: SqlOperandCountRange = {
 var min = 255
--- End diff --

min=254 is enough.


---


[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5206#discussion_r158603987
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -481,4 +484,34 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
   }
 
+  @Test
+  def testUDFWithLongVarargs(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+tEnv.registerFunction("varudf", VarUDF)
+
+val parameters = (0 until 255).map(_ => "c").mkString(",")
+val sqlQuery = s"SELECT varudf($parameters) FROM T1"
+
+val t1 = 
StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "510",
+  "1275",
+  "2805")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+}
+
+object VarUDF extends ScalarFunction {
--- End diff --

Suggest using exist test scalarFunction, such as : 
`org.apache.flink.table.expressions.utils.userDefinedScalarFunctions # 
Func15`
OR move the VarUDF into 
`org.apache.flink.table.expressions.utils.userDefinedScalarFunctions`


---


[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...

2017-12-23 Thread Xpray
GitHub user Xpray opened a pull request:

https://github.com/apache/flink/pull/5206

[FLINK-8312][TableAPI && SQL] Fix ScalarFunction varargs length excee…



## What is the purpose of the change

*Support parameters exceeds 254 with Varargs for SQL*


## Brief change log

  - *if scalar function is varargs, do not check parameter length, set max 
length to -1*


## Verifying this change



This change added tests and can be verified as follows:

*(example:)*
  - *SqlITCase.testUDFWithLongVarargs*


## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Xpray/flink FLINK-8312

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5206.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5206


commit 4de1200791193163e5ade21cdfb1160e3894342d
Author: Xpray 
Date:   2017-12-23T10:52:29Z

[FLINK-8312][TableAPI && SQL] Fix ScalarFunction varargs length exceeds 254 
for SQL




---