[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5206 ---
[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5206#discussion_r158604394 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -153,15 +154,19 @@ object ScalarSqlFunction { override def getOperandCountRange: SqlOperandCountRange = { var min = 255 var max = -1 +var isVarargs = false signatures.foreach( sig => { - var len = sig.length - if (len > 0 && sig(sig.length - 1).isArray) { -max = 254 // according to JVM spec 4.3.3 -len = sig.length - 1 + var len = sig._2.length + if (len > 0 && sig._1 && sig._2(sig._2.length - 1).isArray) { +isVarargs = true +len = sig._2.length - 1 --- End diff -- **approach 1ï¼** sig._2.length - 1 => len -1 len = sig._2.length - 1 => len-1 **approach2** methods.foreach( m => { var len = m.getParameterTypes.length if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) { isVarargs = true len = len - 1 } max = Math.max(len, max) min = Math.min(len, min) }) Using approach2 we can remove "val signatures = methods.map(m => m.isVarArgs -> m.getParameterTypes)" What do you think? ---
[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5206#discussion_r158603997 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -153,15 +154,19 @@ object ScalarSqlFunction { override def getOperandCountRange: SqlOperandCountRange = { var min = 255 --- End diff -- min=254 is enough. ---
[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5206#discussion_r158603987 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -481,4 +484,34 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted) } + @Test + def testUDFWithLongVarargs(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +tEnv.registerFunction("varudf", VarUDF) + +val parameters = (0 until 255).map(_ => "c").mkString(",") +val sqlQuery = s"SELECT varudf($parameters) FROM T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "510", + "1275", + "2805") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + +} + +object VarUDF extends ScalarFunction { --- End diff -- Suggest using exist test scalarFunction, such as : `org.apache.flink.table.expressions.utils.userDefinedScalarFunctions # Func15` OR move the VarUDF into `org.apache.flink.table.expressions.utils.userDefinedScalarFunctions` ---
[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...
GitHub user Xpray opened a pull request: https://github.com/apache/flink/pull/5206 [FLINK-8312][TableAPI && SQL] Fix ScalarFunction varargs length excee⦠## What is the purpose of the change *Support parameters exceeds 254 with Varargs for SQL* ## Brief change log - *if scalar function is varargs, do not check parameter length, set max length to -1* ## Verifying this change This change added tests and can be verified as follows: *(example:)* - *SqlITCase.testUDFWithLongVarargs* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no You can merge this pull request into a Git repository by running: $ git pull https://github.com/Xpray/flink FLINK-8312 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5206.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5206 commit 4de1200791193163e5ade21cdfb1160e3894342d Author: XprayDate: 2017-12-23T10:52:29Z [FLINK-8312][TableAPI && SQL] Fix ScalarFunction varargs length exceeds 254 for SQL ---