[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303119#comment-16303119 ] ASF GitHub Bot commented on FLINK-8289: --- Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/5190#discussion_r158627004 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -155,6 +156,7 @@ protected void runCluster(Configuration configuration) throws Exception { // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); + configuration.setString(RestOptions.REST_ADDRESS, commonRpcService.getAddress()); --- End diff -- I think what we want to get from the RestServerEndpoint is its server address. One way is to let its bind address to be the real ip of the machine. The common rpc address now is the real ip. > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...
Github user shuai-xu commented on a diff in the pull request: https://github.com/apache/flink/pull/5190#discussion_r158627004 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java --- @@ -155,6 +156,7 @@ protected void runCluster(Configuration configuration) throws Exception { // write host information into configuration configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress()); configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort()); + configuration.setString(RestOptions.REST_ADDRESS, commonRpcService.getAddress()); --- End diff -- I think what we want to get from the RestServerEndpoint is its server address. One way is to let its bind address to be the real ip of the machine. The common rpc address now is the real ip. ---
[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress
[ https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303117#comment-16303117 ] shuai.xu commented on FLINK-8289: - The RestServerEndpoint should never know whether there is a proxy, so I think the getRestAddress should always return the server address of it. There are two ways: 1. let the endpoint always return its true server address not matter what address it is bind to. 2. bind it to the ip of the machine. > The RestServerEndpoint should return the address with real ip when > getRestAdddress > -- > > Key: FLINK-8289 > URL: https://issues.apache.org/jira/browse/FLINK-8289 > Project: Flink > Issue Type: Bug >Affects Versions: 1.5.0 >Reporter: shuai.xu > Labels: flip-6 > > Now when RestServerEndpoint.getRestAddress, it will return an address same > with the value of config rest.address, the default it 127.0.0.1:9067, but > this address can not be accessed from another machine. And the ip for > Dispatcher and JobMaster are usually dynamically, so user will configure it > to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address > will be registered to YARN or Mesos, but this address can not be accessed > from another machine also. So it need to return the real ip:port for user to > access the web monitor anywhere. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction varargs ...
Github user Xpray commented on the issue: https://github.com/apache/flink/pull/5206 Hi, @sunjincheng121 , thanks for the comments, I've updated this PR. ---
[jira] [Commented] (FLINK-8312) Fix ScalarFunction varargs length exceeds 254 for SQL
[ https://issues.apache.org/jira/browse/FLINK-8312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303020#comment-16303020 ] ASF GitHub Bot commented on FLINK-8312: --- Github user Xpray commented on the issue: https://github.com/apache/flink/pull/5206 Hi, @sunjincheng121 , thanks for the comments, I've updated this PR. > Fix ScalarFunction varargs length exceeds 254 for SQL > - > > Key: FLINK-8312 > URL: https://issues.apache.org/jira/browse/FLINK-8312 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > With Varargs, TableAPI can handle scalar function call with parameters > exceeds 254 correctly. > This issue is intend to support long parameters for SQL -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8043) change fullRestarts (for fine grained recovery) from guage to counter
[ https://issues.apache.org/jira/browse/FLINK-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Steven Zhen Wu updated FLINK-8043: -- Description: Fine grained recovery publish fullRestarts as guage, which is not suitable for threshold based alerting. Usually we would alert like "fullRestarts > 0 happens 10 times in last 15 minutes". In comparison, "task_failures" is published as counter. was:When fine grained recovery failed (e.g. due to not enough taskmager slots when replacement taskmanager node didn't come back in time), Flink will revert to full job restart. In this case, it should also increment "job restart" metric Summary: change fullRestarts (for fine grained recovery) from guage to counter (was: increment job restart metric when fine grained recovery reverted to full job restart) > change fullRestarts (for fine grained recovery) from guage to counter > - > > Key: FLINK-8043 > URL: https://issues.apache.org/jira/browse/FLINK-8043 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.3.2 >Reporter: Steven Zhen Wu > > Fine grained recovery publish fullRestarts as guage, which is not suitable > for threshold based alerting. Usually we would alert like "fullRestarts > 0 > happens 10 times in last 15 minutes". > In comparison, "task_failures" is published as counter. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8312) Fix ScalarFunction varargs length exceeds 254 for SQL
[ https://issues.apache.org/jira/browse/FLINK-8312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302866#comment-16302866 ] ASF GitHub Bot commented on FLINK-8312: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5206#discussion_r158603997 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -153,15 +154,19 @@ object ScalarSqlFunction { override def getOperandCountRange: SqlOperandCountRange = { var min = 255 --- End diff -- min=254 is enough. > Fix ScalarFunction varargs length exceeds 254 for SQL > - > > Key: FLINK-8312 > URL: https://issues.apache.org/jira/browse/FLINK-8312 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > With Varargs, TableAPI can handle scalar function call with parameters > exceeds 254 correctly. > This issue is intend to support long parameters for SQL -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8312) Fix ScalarFunction varargs length exceeds 254 for SQL
[ https://issues.apache.org/jira/browse/FLINK-8312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302867#comment-16302867 ] ASF GitHub Bot commented on FLINK-8312: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5206#discussion_r158603987 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -481,4 +484,34 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted) } + @Test + def testUDFWithLongVarargs(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +tEnv.registerFunction("varudf", VarUDF) + +val parameters = (0 until 255).map(_ => "c").mkString(",") +val sqlQuery = s"SELECT varudf($parameters) FROM T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "510", + "1275", + "2805") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + +} + +object VarUDF extends ScalarFunction { --- End diff -- Suggest using exist test scalarFunction, such as : `org.apache.flink.table.expressions.utils.userDefinedScalarFunctions # Func15` OR move the VarUDF into `org.apache.flink.table.expressions.utils.userDefinedScalarFunctions` > Fix ScalarFunction varargs length exceeds 254 for SQL > - > > Key: FLINK-8312 > URL: https://issues.apache.org/jira/browse/FLINK-8312 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > With Varargs, TableAPI can handle scalar function call with parameters > exceeds 254 correctly. > This issue is intend to support long parameters for SQL -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8312) Fix ScalarFunction varargs length exceeds 254 for SQL
[ https://issues.apache.org/jira/browse/FLINK-8312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302868#comment-16302868 ] ASF GitHub Bot commented on FLINK-8312: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5206#discussion_r158604394 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -153,15 +154,19 @@ object ScalarSqlFunction { override def getOperandCountRange: SqlOperandCountRange = { var min = 255 var max = -1 +var isVarargs = false signatures.foreach( sig => { - var len = sig.length - if (len > 0 && sig(sig.length - 1).isArray) { -max = 254 // according to JVM spec 4.3.3 -len = sig.length - 1 + var len = sig._2.length + if (len > 0 && sig._1 && sig._2(sig._2.length - 1).isArray) { +isVarargs = true +len = sig._2.length - 1 --- End diff -- **approach 1:** sig._2.length - 1 => len -1 len = sig._2.length - 1 => len-1 **approach2** methods.foreach( m => { var len = m.getParameterTypes.length if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) { isVarargs = true len = len - 1 } max = Math.max(len, max) min = Math.min(len, min) }) Using approach2 we can remove "val signatures = methods.map(m => m.isVarArgs -> m.getParameterTypes)" What do you think? > Fix ScalarFunction varargs length exceeds 254 for SQL > - > > Key: FLINK-8312 > URL: https://issues.apache.org/jira/browse/FLINK-8312 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > With Varargs, TableAPI can handle scalar function call with parameters > exceeds 254 correctly. > This issue is intend to support long parameters for SQL -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5206#discussion_r158604394 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -153,15 +154,19 @@ object ScalarSqlFunction { override def getOperandCountRange: SqlOperandCountRange = { var min = 255 var max = -1 +var isVarargs = false signatures.foreach( sig => { - var len = sig.length - if (len > 0 && sig(sig.length - 1).isArray) { -max = 254 // according to JVM spec 4.3.3 -len = sig.length - 1 + var len = sig._2.length + if (len > 0 && sig._1 && sig._2(sig._2.length - 1).isArray) { +isVarargs = true +len = sig._2.length - 1 --- End diff -- **approach 1ï¼** sig._2.length - 1 => len -1 len = sig._2.length - 1 => len-1 **approach2** methods.foreach( m => { var len = m.getParameterTypes.length if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 1).isArray) { isVarargs = true len = len - 1 } max = Math.max(len, max) min = Math.min(len, min) }) Using approach2 we can remove "val signatures = methods.map(m => m.isVarArgs -> m.getParameterTypes)" What do you think? ---
[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5206#discussion_r158603997 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala --- @@ -153,15 +154,19 @@ object ScalarSqlFunction { override def getOperandCountRange: SqlOperandCountRange = { var min = 255 --- End diff -- min=254 is enough. ---
[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5206#discussion_r158603987 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -481,4 +484,34 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted) } + @Test + def testUDFWithLongVarargs(): Unit = { +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +tEnv.registerFunction("varudf", VarUDF) + +val parameters = (0 until 255).map(_ => "c").mkString(",") +val sqlQuery = s"SELECT varudf($parameters) FROM T1" + +val t1 = StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c) +tEnv.registerTable("T1", t1) + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "510", + "1275", + "2805") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + +} + +object VarUDF extends ScalarFunction { --- End diff -- Suggest using exist test scalarFunction, such as : `org.apache.flink.table.expressions.utils.userDefinedScalarFunctions # Func15` OR move the VarUDF into `org.apache.flink.table.expressions.utils.userDefinedScalarFunctions` ---
[jira] [Commented] (FLINK-8301) Support Unicode in codegen for SQL && TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302839#comment-16302839 ] ASF GitHub Bot commented on FLINK-8301: --- Github user Xpray commented on the issue: https://github.com/apache/flink/pull/5203 Thanks to @sunjincheng121 , the PR has updated. > Support Unicode in codegen for SQL && TableAPI > -- > > Key: FLINK-8301 > URL: https://issues.apache.org/jira/browse/FLINK-8301 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current code generation do not support Unicode, "\u0001" will be > generated to "\\u0001", function call like concat(str, "\u0001") will lead to > wrong result. > This issue intend to handle char/varchar literal correctly, some examples > followed as below. > literal: '\u0001abc'-> codegen: "\u0001abc" > literal: '\u0022\' -> codegen: "\"\\" -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5203: [FLINK-8301] Support Unicode in codegen for SQL && TableA...
Github user Xpray commented on the issue: https://github.com/apache/flink/pull/5203 Thanks to @sunjincheng121 , the PR has updated. ---
[jira] [Assigned] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reassigned FLINK-8317: --- Assignee: Gary Yao > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to interface > * Implement method in {{Dispatcher}} and {{JobMaster}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8317: Description: Enable triggering of savepoints via RestfulGateway: * Add method to {{CompletableFuture triggerSavepoint(long timestamp, String targetDirectory)}} to interface * Implement method in {{Dispatcher}} and {{JobMaster}} was:Enable triggering of savepoints via RestfulGateway. Summary: Enable Triggering of Savepoints via RestfulGateway (was: Enable triggering of Savepoints via RestfulGateway) > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to interface > * Implement method in {{Dispatcher}} and {{JobMaster}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8317: Description: Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: * Add method to {{CompletableFuture triggerSavepoint(long timestamp, String targetDirectory)}} to interface * Implement method in {{Dispatcher}} and {{JobMaster}} was: Enable triggering of savepoints via RestfulGateway: * Add method to {{CompletableFuture triggerSavepoint(long timestamp, String targetDirectory)}} to interface * Implement method in {{Dispatcher}} and {{JobMaster}} > Enable Triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints in FLIP-6 mode via RestfulGateway: > * Add method to {{CompletableFuture > triggerSavepoint(long timestamp, String targetDirectory)}} to interface > * Implement method in {{Dispatcher}} and {{JobMaster}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-8317) Enable triggering of Savepoints via RestfulGateway
[ https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-8317: Description: Enable triggering of savepoints via RestfulGateway. (was: Enable triggering of savepoints via HTTP: * It must be possible to trigger savepoints if the Dispatcher is absent (job mode) * ) Summary: Enable triggering of Savepoints via RestfulGateway (was: Expose met triggering of Savepoints via HTTP) > Enable triggering of Savepoints via RestfulGateway > -- > > Key: FLINK-8317 > URL: https://issues.apache.org/jira/browse/FLINK-8317 > Project: Flink > Issue Type: New Feature > Components: Distributed Coordination, REST >Affects Versions: 1.5.0 >Reporter: Gary Yao > Labels: flip-6 > Fix For: 1.5.0 > > > Enable triggering of savepoints via RestfulGateway. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8317) Expose met triggering of Savepoints via HTTP
Gary Yao created FLINK-8317: --- Summary: Expose met triggering of Savepoints via HTTP Key: FLINK-8317 URL: https://issues.apache.org/jira/browse/FLINK-8317 Project: Flink Issue Type: New Feature Components: Distributed Coordination, REST Affects Versions: 1.5.0 Reporter: Gary Yao Fix For: 1.5.0 Enable triggering of savepoints via HTTP: * It must be possible to trigger savepoints if the Dispatcher is absent (job mode) * -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7780) Integrate savepoint command into REST client
[ https://issues.apache.org/jira/browse/FLINK-7780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao closed FLINK-7780. --- Resolution: Fixed Re-opened to edit the labels. > Integrate savepoint command into REST client > > > Key: FLINK-7780 > URL: https://issues.apache.org/jira/browse/FLINK-7780 > Project: Flink > Issue Type: Sub-task > Components: Client, REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7780) Integrate savepoint command into REST client
[ https://issues.apache.org/jira/browse/FLINK-7780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao updated FLINK-7780: Labels: flip-6 (was: ) > Integrate savepoint command into REST client > > > Key: FLINK-7780 > URL: https://issues.apache.org/jira/browse/FLINK-7780 > Project: Flink > Issue Type: Sub-task > Components: Client, REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Reopened] (FLINK-7780) Integrate savepoint command into REST client
[ https://issues.apache.org/jira/browse/FLINK-7780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao reopened FLINK-7780: - Assignee: Gary Yao (was: Chesnay Schepler) > Integrate savepoint command into REST client > > > Key: FLINK-7780 > URL: https://issues.apache.org/jira/browse/FLINK-7780 > Project: Flink > Issue Type: Sub-task > Components: Client, REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Gary Yao >Priority: Blocker > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-4834) Implement unified High Availability Services Abstraction
[ https://issues.apache.org/jira/browse/FLINK-4834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302812#comment-16302812 ] Gary Yao commented on FLINK-4834: - All subtasks are done. Can this be closed? > Implement unified High Availability Services Abstraction > > > Key: FLINK-4834 > URL: https://issues.apache.org/jira/browse/FLINK-4834 > Project: Flink > Issue Type: New Feature > Components: Cluster Management > Environment: FLIP-6 feature branch >Reporter: Stephan Ewen > Labels: flip-6 > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8231) Extend AbstractRestHandler to return response headers
[ https://issues.apache.org/jira/browse/FLINK-8231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gary Yao closed FLINK-8231. --- Resolution: Won't Do Not needed for now. > Extend AbstractRestHandler to return response headers > - > > Key: FLINK-8231 > URL: https://issues.apache.org/jira/browse/FLINK-8231 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Till Rohrmann > Labels: flip-6 > Fix For: 1.5.0 > > > The {{AbstractRestHandler}} should be able to return a set of response > headers in case that it creates a new resource (POST call) and wants to set > the redirection header, for example. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8299) Retrieve ExecutionResult by REST polling
[ https://issues.apache.org/jira/browse/FLINK-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302807#comment-16302807 ] ASF GitHub Bot commented on FLINK-8299: --- GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5207 [FLINK-8299][flip6] Poll JobExecutionResult after job submission ## What is the purpose of the change *Poll JobExecutionResult after job submission. This is needed, for example, to enable `collect()` calls from the job in FLIP-6 mode. This PR is based on #5194.* CC: @tillrohrmann ## Brief change log - *Retrieve JobExecutionResult after job submission in `RestClusterClient`* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests for all new classes and changed classes.* - *Manually run job in examples/batch/WordCount.jar and verified that the results are correctly collected/printed.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8299 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5207.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5207 commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d Author: gyaoDate: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults commit 748745ac3521a20040cbda4056dfd9c53bc24a82 Author: gyao Date: 2017-12-20T13:44:03Z [FLINK-8233][flip6] Add JobExecutionResultHandler - Allow retrieval of the JobExecutionResult cached in Dispatcher. - Implement serializer and deserializer for JobExecutionResult. commit adf091a2770f42d6f8a0c19ab88cc7a208943a32 Author: gyao Date: 2017-12-20T13:44:26Z [hotfix] Clean up ExecutionGraph - Remove unnecessary throws clause. - Format whitespace. commit f5c28527b3a1a0c8ec52f2a5616ebb634397b69c Author: gyao Date: 2017-12-22T23:02:10Z [FLINK-8299][flip6] Retrieve JobExecutionResult after job submission > Retrieve ExecutionResult by REST polling > > > Key: FLINK-8299 > URL: https://issues.apache.org/jira/browse/FLINK-8299 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.5.0 >Reporter: Gary Yao >Assignee: Gary Yao >Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > Retrieve the {{ExecutionResult}} from a finished Flink job via the > {{RestClusterClient}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/5207 [FLINK-8299][flip6] Poll JobExecutionResult after job submission ## What is the purpose of the change *Poll JobExecutionResult after job submission. This is needed, for example, to enable `collect()` calls from the job in FLIP-6 mode. This PR is based on #5194.* CC: @tillrohrmann ## Brief change log - *Retrieve JobExecutionResult after job submission in `RestClusterClient`* ## Verifying this change This change added tests and can be verified as follows: - *Added unit tests for all new classes and changed classes.* - *Manually run job in examples/batch/WordCount.jar and verified that the results are correctly collected/printed.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-8299 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5207.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5207 commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d Author: gyaoDate: 2017-12-19T17:58:53Z [FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher - Introduce new JobExecutionResult used by JobMaster to forward the information in the already existing JobExecutionResult. - Always cache a JobExecutionResult. Even in case of job failures. In case of job failures, the serialized exception is stored additionally. - Introduce new methods to RestfulGateway to allow retrieval of cached JobExecutionResults commit 748745ac3521a20040cbda4056dfd9c53bc24a82 Author: gyao Date: 2017-12-20T13:44:03Z [FLINK-8233][flip6] Add JobExecutionResultHandler - Allow retrieval of the JobExecutionResult cached in Dispatcher. - Implement serializer and deserializer for JobExecutionResult. commit adf091a2770f42d6f8a0c19ab88cc7a208943a32 Author: gyao Date: 2017-12-20T13:44:26Z [hotfix] Clean up ExecutionGraph - Remove unnecessary throws clause. - Format whitespace. commit f5c28527b3a1a0c8ec52f2a5616ebb634397b69c Author: gyao Date: 2017-12-22T23:02:10Z [FLINK-8299][flip6] Retrieve JobExecutionResult after job submission ---
[jira] [Commented] (FLINK-8301) Support Unicode in codegen for SQL && TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302803#comment-16302803 ] ASF GitHub Bot commented on FLINK-8301: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5203#discussion_r158598514 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala --- @@ -352,6 +353,72 @@ class CalcITCase( val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testDeterministicUdfWithUnicodeParameter(): Unit = { +val data = new mutable.MutableList[(String, String, String)] +data.+=((null, null, null)) + +val env = ExecutionEnvironment.getExecutionEnvironment + +val tEnv = TableEnvironment.getTableEnvironment(env) + +val udf0 = new LiteralUDF("\"\\", deterministic = true) +val udf1 = new LiteralUDF("\u0001xyz", deterministic = true) +val udf2 = new LiteralUDF("\u0001\u0012", deterministic = true) + +tEnv.registerFunction("udf0", udf0) +tEnv.registerFunction("udf1", udf1) +tEnv.registerFunction("udf2", udf2) + +// user have to specify '\' with '\\' in SQL +val sqlQuery = "SELECT " + + "udf0('\"') as str1, " + + "udf1('\u0001xyz') as str2, " + + "udf2('\u0001\u0012') as str3 from T1" + +val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3) + +tEnv.registerTable("T1", t1) + +val results = tEnv.sql(sqlQuery).toDataSet[Row].collect() + +val expected = List("\"\\,\u0001xyz,\u0001\u0012").mkString("\n") +TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testNonDeterministicUdfWithUnicodeParameter(): Unit = { --- End diff -- For reduce IT test time cost, I suggest that merge "testDeterministicUdfWithUnicodeParameter" and "testNonDeterministicUdfWithUnicodeParameter" in one test case. i.e. we create two instance with deterministic value. something as follows: ` val udf00 = new LiteralUDF("\"\\", deterministic = false) val udf01 = new LiteralUDF("\"\\", deterministic = true) ... ` > Support Unicode in codegen for SQL && TableAPI > -- > > Key: FLINK-8301 > URL: https://issues.apache.org/jira/browse/FLINK-8301 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current code generation do not support Unicode, "\u0001" will be > generated to "\\u0001", function call like concat(str, "\u0001") will lead to > wrong result. > This issue intend to handle char/varchar literal correctly, some examples > followed as below. > literal: '\u0001abc'-> codegen: "\u0001abc" > literal: '\u0022\' -> codegen: "\"\\" -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8301) Support Unicode in codegen for SQL && TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302801#comment-16302801 ] ASF GitHub Bot commented on FLINK-8301: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5203#discussion_r158598528 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala --- @@ -541,6 +541,48 @@ class CalcITCase( "default-nosharp,Sunny-nosharp,kevin2-nosharp" TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testDeterministicUDFWithUnicodeParameter(): Unit = { +val data = List( + ("a\u0001b", "c\"d", "e\\\"\u0004f"), + ("x\u0001y", "y\"z", "z\\\"\u0004z") +) +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val splitUDF = new SplitUDF(deterministic = true) +val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c) + .select(splitUDF('a, "\u0001", 0) as 'a, + splitUDF('b, "\"", 1) as 'b, + splitUDF('c, "\\\"\u0004", 0) as 'c + ) +val results = ds.collect() +val expected = List( + "a,d,e", "x,z,z" +).mkString("\n") +TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testNonDeterministicUDFWithUnicodeParameter(): Unit = { +val data = List( --- End diff -- Same suggest as above. > Support Unicode in codegen for SQL && TableAPI > -- > > Key: FLINK-8301 > URL: https://issues.apache.org/jira/browse/FLINK-8301 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current code generation do not support Unicode, "\u0001" will be > generated to "\\u0001", function call like concat(str, "\u0001") will lead to > wrong result. > This issue intend to handle char/varchar literal correctly, some examples > followed as below. > literal: '\u0001abc'-> codegen: "\u0001abc" > literal: '\u0022\' -> codegen: "\"\\" -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8301) Support Unicode in codegen for SQL && TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302802#comment-16302802 ] ASF GitHub Bot commented on FLINK-8301: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5203#discussion_r158598552 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -481,4 +484,84 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted) } + @Test + def testDeterministicUdfWithUnicodeParameter(): Unit = { +val data = new mutable.MutableList[(String, String, String)] +data.+=((null, null, null)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment + +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +val udf0 = new LiteralUDF("\"\\", deterministic = true) +val udf1 = new LiteralUDF("\u0001xyz", deterministic = true) +val udf2 = new LiteralUDF("\u0001\u0012", deterministic = true) + +tEnv.registerFunction("udf0", udf0) +tEnv.registerFunction("udf1", udf1) +tEnv.registerFunction("udf2", udf2) + +// user have to specify '\' with '\\' in SQL +val sqlQuery = "SELECT " + + "udf0('\"') as str1, " + + "udf1('\u0001xyz') as str2, " + + "udf2('\u0001\u0012') as str3 from T1" + +val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3) + +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List("\"\\,\u0001xyz,\u0001\u0012") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testNonDeterministicUdfWithUnicodeParameter(): Unit = { +val data = new mutable.MutableList[(String, String, String)] --- End diff -- Same suggest as above. > Support Unicode in codegen for SQL && TableAPI > -- > > Key: FLINK-8301 > URL: https://issues.apache.org/jira/browse/FLINK-8301 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current code generation do not support Unicode, "\u0001" will be > generated to "\\u0001", function call like concat(str, "\u0001") will lead to > wrong result. > This issue intend to handle char/varchar literal correctly, some examples > followed as below. > literal: '\u0001abc'-> codegen: "\u0001abc" > literal: '\u0022\' -> codegen: "\"\\" -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8301) Support Unicode in codegen for SQL && TableAPI
[ https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302804#comment-16302804 ] ASF GitHub Bot commented on FLINK-8301: --- Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5203#discussion_r158598561 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala --- @@ -352,4 +354,64 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { "{9=Comment#3}") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testDeterministicUDFWithUnicodeParameter(): Unit = { +val data = List( + ("a\u0001b", "c\"d", "e\\\"\u0004f"), + ("x\u0001y", "y\"z", "z\\\"\u0004z") +) +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +val splitUDF = new SplitUDF(deterministic = true) +val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c) + .select(splitUDF('a, "\u0001", 0) as 'a, + splitUDF('b, "\"", 1) as 'b, + splitUDF('c, "\\\"\u0004", 0) as 'c + ) +val results = ds.toAppendStream[Row] +results.addSink(new StreamITCase.StringSink[Row]) +env.execute() +val expected = mutable.MutableList( + "a,d,e", "x,z,z" +) +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testNonDeterministicUDFWithUnicodeParameter(): Unit = { +val data = List( --- End diff -- Same suggest as above. > Support Unicode in codegen for SQL && TableAPI > -- > > Key: FLINK-8301 > URL: https://issues.apache.org/jira/browse/FLINK-8301 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Ruidong Li >Assignee: Ruidong Li > > The current code generation do not support Unicode, "\u0001" will be > generated to "\\u0001", function call like concat(str, "\u0001") will lead to > wrong result. > This issue intend to handle char/varchar literal correctly, some examples > followed as below. > literal: '\u0001abc'-> codegen: "\u0001abc" > literal: '\u0022\' -> codegen: "\"\\" -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5203#discussion_r158598552 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -481,4 +484,84 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted) } + @Test + def testDeterministicUdfWithUnicodeParameter(): Unit = { +val data = new mutable.MutableList[(String, String, String)] +data.+=((null, null, null)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment + +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.clear + +val udf0 = new LiteralUDF("\"\\", deterministic = true) +val udf1 = new LiteralUDF("\u0001xyz", deterministic = true) +val udf2 = new LiteralUDF("\u0001\u0012", deterministic = true) + +tEnv.registerFunction("udf0", udf0) +tEnv.registerFunction("udf1", udf1) +tEnv.registerFunction("udf2", udf2) + +// user have to specify '\' with '\\' in SQL +val sqlQuery = "SELECT " + + "udf0('\"') as str1, " + + "udf1('\u0001xyz') as str2, " + + "udf2('\u0001\u0012') as str3 from T1" + +val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3) + +tEnv.registerTable("T1", t1) + +val result = tEnv.sql(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List("\"\\,\u0001xyz,\u0001\u0012") +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testNonDeterministicUdfWithUnicodeParameter(): Unit = { +val data = new mutable.MutableList[(String, String, String)] --- End diff -- Same suggest as above. ---
[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5203#discussion_r158598528 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala --- @@ -541,6 +541,48 @@ class CalcITCase( "default-nosharp,Sunny-nosharp,kevin2-nosharp" TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testDeterministicUDFWithUnicodeParameter(): Unit = { +val data = List( + ("a\u0001b", "c\"d", "e\\\"\u0004f"), + ("x\u0001y", "y\"z", "z\\\"\u0004z") +) +val env = ExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +val splitUDF = new SplitUDF(deterministic = true) +val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c) + .select(splitUDF('a, "\u0001", 0) as 'a, + splitUDF('b, "\"", 1) as 'b, + splitUDF('c, "\\\"\u0004", 0) as 'c + ) +val results = ds.collect() +val expected = List( + "a,d,e", "x,z,z" +).mkString("\n") +TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testNonDeterministicUDFWithUnicodeParameter(): Unit = { +val data = List( --- End diff -- Same suggest as above. ---
[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5203#discussion_r158598561 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala --- @@ -352,4 +354,64 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { "{9=Comment#3}") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testDeterministicUDFWithUnicodeParameter(): Unit = { +val data = List( + ("a\u0001b", "c\"d", "e\\\"\u0004f"), + ("x\u0001y", "y\"z", "z\\\"\u0004z") +) +val env = StreamExecutionEnvironment.getExecutionEnvironment +val tEnv = TableEnvironment.getTableEnvironment(env) +StreamITCase.testResults = mutable.MutableList() +val splitUDF = new SplitUDF(deterministic = true) +val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c) + .select(splitUDF('a, "\u0001", 0) as 'a, + splitUDF('b, "\"", 1) as 'b, + splitUDF('c, "\\\"\u0004", 0) as 'c + ) +val results = ds.toAppendStream[Row] +results.addSink(new StreamITCase.StringSink[Row]) +env.execute() +val expected = mutable.MutableList( + "a,d,e", "x,z,z" +) +assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testNonDeterministicUDFWithUnicodeParameter(): Unit = { +val data = List( --- End diff -- Same suggest as above. ---
[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...
Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5203#discussion_r158598514 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala --- @@ -352,6 +353,72 @@ class CalcITCase( val results = result.toDataSet[Row].collect() TestBaseUtils.compareResultAsText(results.asJava, expected) } + + @Test + def testDeterministicUdfWithUnicodeParameter(): Unit = { +val data = new mutable.MutableList[(String, String, String)] +data.+=((null, null, null)) + +val env = ExecutionEnvironment.getExecutionEnvironment + +val tEnv = TableEnvironment.getTableEnvironment(env) + +val udf0 = new LiteralUDF("\"\\", deterministic = true) +val udf1 = new LiteralUDF("\u0001xyz", deterministic = true) +val udf2 = new LiteralUDF("\u0001\u0012", deterministic = true) + +tEnv.registerFunction("udf0", udf0) +tEnv.registerFunction("udf1", udf1) +tEnv.registerFunction("udf2", udf2) + +// user have to specify '\' with '\\' in SQL +val sqlQuery = "SELECT " + + "udf0('\"') as str1, " + + "udf1('\u0001xyz') as str2, " + + "udf2('\u0001\u0012') as str3 from T1" + +val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3) + +tEnv.registerTable("T1", t1) + +val results = tEnv.sql(sqlQuery).toDataSet[Row].collect() + +val expected = List("\"\\,\u0001xyz,\u0001\u0012").mkString("\n") +TestBaseUtils.compareResultAsText(results.asJava, expected) + } + + @Test + def testNonDeterministicUdfWithUnicodeParameter(): Unit = { --- End diff -- For reduce IT test time cost, I suggest that merge "testDeterministicUdfWithUnicodeParameter" and "testNonDeterministicUdfWithUnicodeParameter" in one test case. i.e. we create two instance with deterministic value. something as follows: ` val udf00 = new LiteralUDF("\"\\", deterministic = false) val udf01 = new LiteralUDF("\"\\", deterministic = true) ... ` ---