[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303119#comment-16303119
 ] 

ASF GitHub Bot commented on FLINK-8289:
---

Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5190#discussion_r158627004
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -155,6 +156,7 @@ protected void runCluster(Configuration configuration) 
throws Exception {
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, 
commonRpcService.getPort());
+   configuration.setString(RestOptions.REST_ADDRESS, 
commonRpcService.getAddress());
--- End diff --

I think what we want to get from the RestServerEndpoint is its server 
address. One way is to let its bind address to be the real ip of the machine. 
The common rpc address now is the real ip. 


> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5190: [FLINK-8289] [runtime] set the rest.address to the...

2017-12-24 Thread shuai-xu
Github user shuai-xu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5190#discussion_r158627004
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/ClusterEntrypoint.java
 ---
@@ -155,6 +156,7 @@ protected void runCluster(Configuration configuration) 
throws Exception {
// write host information into configuration
configuration.setString(JobManagerOptions.ADDRESS, 
commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, 
commonRpcService.getPort());
+   configuration.setString(RestOptions.REST_ADDRESS, 
commonRpcService.getAddress());
--- End diff --

I think what we want to get from the RestServerEndpoint is its server 
address. One way is to let its bind address to be the real ip of the machine. 
The common rpc address now is the real ip. 


---


[jira] [Commented] (FLINK-8289) The RestServerEndpoint should return the address with real ip when getRestAdddress

2017-12-24 Thread shuai.xu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8289?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303117#comment-16303117
 ] 

shuai.xu commented on FLINK-8289:
-

The RestServerEndpoint should never know whether there is a proxy, so I think 
the getRestAddress should always return the server address of it. 
There are two ways: 1. let the endpoint always return its true server address 
not matter what address it is bind to. 2. bind it to the ip of the machine. 

> The RestServerEndpoint should return the address with real ip when 
> getRestAdddress
> --
>
> Key: FLINK-8289
> URL: https://issues.apache.org/jira/browse/FLINK-8289
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.5.0
>Reporter: shuai.xu
>  Labels: flip-6
>
> Now when RestServerEndpoint.getRestAddress, it will return an address same 
> with the value of config rest.address, the default it 127.0.0.1:9067, but 
> this address can not be accessed from another machine. And the ip for 
> Dispatcher and JobMaster are usually dynamically, so user will configure it 
> to 0.0.0.0, and the getRestAddress will return 0.0.0.0:9067, this address 
> will be registered to YARN or Mesos, but this address can not be accessed 
> from another machine also. So it need to return the real ip:port for user to 
> access the web monitor anywhere.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction varargs ...

2017-12-24 Thread Xpray
Github user Xpray commented on the issue:

https://github.com/apache/flink/pull/5206
  
Hi, @sunjincheng121 , thanks for the comments, I've updated this PR.


---


[jira] [Commented] (FLINK-8312) Fix ScalarFunction varargs length exceeds 254 for SQL

2017-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16303020#comment-16303020
 ] 

ASF GitHub Bot commented on FLINK-8312:
---

Github user Xpray commented on the issue:

https://github.com/apache/flink/pull/5206
  
Hi, @sunjincheng121 , thanks for the comments, I've updated this PR.


> Fix ScalarFunction varargs length exceeds 254 for SQL
> -
>
> Key: FLINK-8312
> URL: https://issues.apache.org/jira/browse/FLINK-8312
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> With Varargs, TableAPI can handle scalar function call with parameters 
> exceeds 254 correctly.
> This issue is intend to support long parameters for SQL



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8043) change fullRestarts (for fine grained recovery) from guage to counter

2017-12-24 Thread Steven Zhen Wu (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8043?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Steven Zhen Wu updated FLINK-8043:
--
Description: 
Fine grained recovery publish fullRestarts as guage, which is not suitable for 
threshold based alerting. Usually we would alert like "fullRestarts > 0 happens 
10 times in last 15 minutes".

In comparison, "task_failures" is published as counter.

  was:When fine grained recovery failed (e.g. due to not enough taskmager slots 
when replacement taskmanager node didn't come back in time), Flink will revert 
to full job restart. In this case, it should also increment "job restart" metric

Summary: change fullRestarts (for fine grained recovery) from guage to 
counter  (was: increment job restart metric when fine grained recovery reverted 
to full job restart)

> change fullRestarts (for fine grained recovery) from guage to counter
> -
>
> Key: FLINK-8043
> URL: https://issues.apache.org/jira/browse/FLINK-8043
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.3.2
>Reporter: Steven Zhen Wu
>
> Fine grained recovery publish fullRestarts as guage, which is not suitable 
> for threshold based alerting. Usually we would alert like "fullRestarts > 0 
> happens 10 times in last 15 minutes".
> In comparison, "task_failures" is published as counter.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8312) Fix ScalarFunction varargs length exceeds 254 for SQL

2017-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302866#comment-16302866
 ] 

ASF GitHub Bot commented on FLINK-8312:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5206#discussion_r158603997
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -153,15 +154,19 @@ object ScalarSqlFunction {
   override def getOperandCountRange: SqlOperandCountRange = {
 var min = 255
--- End diff --

min=254 is enough.


> Fix ScalarFunction varargs length exceeds 254 for SQL
> -
>
> Key: FLINK-8312
> URL: https://issues.apache.org/jira/browse/FLINK-8312
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> With Varargs, TableAPI can handle scalar function call with parameters 
> exceeds 254 correctly.
> This issue is intend to support long parameters for SQL



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8312) Fix ScalarFunction varargs length exceeds 254 for SQL

2017-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302867#comment-16302867
 ] 

ASF GitHub Bot commented on FLINK-8312:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5206#discussion_r158603987
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -481,4 +484,34 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
   }
 
+  @Test
+  def testUDFWithLongVarargs(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+tEnv.registerFunction("varudf", VarUDF)
+
+val parameters = (0 until 255).map(_ => "c").mkString(",")
+val sqlQuery = s"SELECT varudf($parameters) FROM T1"
+
+val t1 = 
StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "510",
+  "1275",
+  "2805")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+}
+
+object VarUDF extends ScalarFunction {
--- End diff --

Suggest using exist test scalarFunction, such as : 
`org.apache.flink.table.expressions.utils.userDefinedScalarFunctions # 
Func15`
OR move the VarUDF into 
`org.apache.flink.table.expressions.utils.userDefinedScalarFunctions`


> Fix ScalarFunction varargs length exceeds 254 for SQL
> -
>
> Key: FLINK-8312
> URL: https://issues.apache.org/jira/browse/FLINK-8312
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> With Varargs, TableAPI can handle scalar function call with parameters 
> exceeds 254 correctly.
> This issue is intend to support long parameters for SQL



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8312) Fix ScalarFunction varargs length exceeds 254 for SQL

2017-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302868#comment-16302868
 ] 

ASF GitHub Bot commented on FLINK-8312:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5206#discussion_r158604394
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -153,15 +154,19 @@ object ScalarSqlFunction {
   override def getOperandCountRange: SqlOperandCountRange = {
 var min = 255
 var max = -1
+var isVarargs = false
 signatures.foreach( sig => {
-  var len = sig.length
-  if (len > 0 && sig(sig.length - 1).isArray) {
-max = 254  // according to JVM spec 4.3.3
-len = sig.length - 1
+  var len = sig._2.length
+  if (len > 0 && sig._1 && sig._2(sig._2.length - 1).isArray) {
+isVarargs = true
+len = sig._2.length - 1
--- End diff --

**approach 1:**
sig._2.length - 1 => len -1 
len = sig._2.length - 1 => len-1

**approach2**
methods.foreach(
  m => {
var len = m.getParameterTypes.length
if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 
1).isArray) {
  isVarargs = true
  len = len - 1
}
max = Math.max(len, max)
min = Math.min(len, min)
  })

Using approach2 we can remove 
"val signatures = methods.map(m => m.isVarArgs -> m.getParameterTypes)"

What do you think?



> Fix ScalarFunction varargs length exceeds 254 for SQL
> -
>
> Key: FLINK-8312
> URL: https://issues.apache.org/jira/browse/FLINK-8312
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> With Varargs, TableAPI can handle scalar function call with parameters 
> exceeds 254 correctly.
> This issue is intend to support long parameters for SQL



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5206#discussion_r158604394
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -153,15 +154,19 @@ object ScalarSqlFunction {
   override def getOperandCountRange: SqlOperandCountRange = {
 var min = 255
 var max = -1
+var isVarargs = false
 signatures.foreach( sig => {
-  var len = sig.length
-  if (len > 0 && sig(sig.length - 1).isArray) {
-max = 254  // according to JVM spec 4.3.3
-len = sig.length - 1
+  var len = sig._2.length
+  if (len > 0 && sig._1 && sig._2(sig._2.length - 1).isArray) {
+isVarargs = true
+len = sig._2.length - 1
--- End diff --

**approach 1:**
sig._2.length - 1 => len -1 
len = sig._2.length - 1 => len-1

**approach2**
methods.foreach(
  m => {
var len = m.getParameterTypes.length
if (len > 0 && m.isVarArgs && m.getParameterTypes()(len - 
1).isArray) {
  isVarargs = true
  len = len - 1
}
max = Math.max(len, max)
min = Math.min(len, min)
  })

Using approach2 we can remove 
"val signatures = methods.map(m => m.isVarArgs -> m.getParameterTypes)"

What do you think?



---


[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5206#discussion_r158603997
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/utils/ScalarSqlFunction.scala
 ---
@@ -153,15 +154,19 @@ object ScalarSqlFunction {
   override def getOperandCountRange: SqlOperandCountRange = {
 var min = 255
--- End diff --

min=254 is enough.


---


[GitHub] flink pull request #5206: [FLINK-8312][TableAPI && SQL] Fix ScalarFunction v...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5206#discussion_r158603987
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -481,4 +484,34 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
   }
 
+  @Test
+  def testUDFWithLongVarargs(): Unit = {
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+tEnv.registerFunction("varudf", VarUDF)
+
+val parameters = (0 until 255).map(_ => "c").mkString(",")
+val sqlQuery = s"SELECT varudf($parameters) FROM T1"
+
+val t1 = 
StreamTestData.getSmall3TupleDataStream(env).toTable(tEnv).as('a, 'b, 'c)
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "510",
+  "1275",
+  "2805")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+}
+
+object VarUDF extends ScalarFunction {
--- End diff --

Suggest using exist test scalarFunction, such as : 
`org.apache.flink.table.expressions.utils.userDefinedScalarFunctions # 
Func15`
OR move the VarUDF into 
`org.apache.flink.table.expressions.utils.userDefinedScalarFunctions`


---


[jira] [Commented] (FLINK-8301) Support Unicode in codegen for SQL && TableAPI

2017-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302839#comment-16302839
 ] 

ASF GitHub Bot commented on FLINK-8301:
---

Github user Xpray commented on the issue:

https://github.com/apache/flink/pull/5203
  
Thanks to @sunjincheng121 , the PR has updated.


> Support Unicode in codegen for SQL && TableAPI
> --
>
> Key: FLINK-8301
> URL: https://issues.apache.org/jira/browse/FLINK-8301
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> The current code generation do not support Unicode, "\u0001" will be 
> generated to "\\u0001", function call like concat(str, "\u0001") will lead to 
> wrong result.
> This issue intend to handle char/varchar literal correctly, some examples 
> followed as below.
> literal: '\u0001abc'->   codegen:  "\u0001abc"
> literal: '\u0022\' ->   codegen:  "\"\\"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5203: [FLINK-8301] Support Unicode in codegen for SQL && TableA...

2017-12-24 Thread Xpray
Github user Xpray commented on the issue:

https://github.com/apache/flink/pull/5203
  
Thanks to @sunjincheng121 , the PR has updated.


---


[jira] [Assigned] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2017-12-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reassigned FLINK-8317:
---

Assignee: Gary Yao

> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2017-12-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8317:

Description: 
Enable triggering of savepoints via RestfulGateway:
* Add method to {{CompletableFuture triggerSavepoint(long 
timestamp, String targetDirectory)}} to interface
* Implement method in {{Dispatcher}} and {{JobMaster}}

  was:Enable triggering of savepoints via RestfulGateway.

Summary: Enable Triggering of Savepoints via RestfulGateway  (was: 
Enable triggering of Savepoints via RestfulGateway)

> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8317) Enable Triggering of Savepoints via RestfulGateway

2017-12-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8317:

Description: 
Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
* Add method to {{CompletableFuture triggerSavepoint(long 
timestamp, String targetDirectory)}} to interface
* Implement method in {{Dispatcher}} and {{JobMaster}}

  was:
Enable triggering of savepoints via RestfulGateway:
* Add method to {{CompletableFuture triggerSavepoint(long 
timestamp, String targetDirectory)}} to interface
* Implement method in {{Dispatcher}} and {{JobMaster}}


> Enable Triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints in FLIP-6 mode via RestfulGateway:
> * Add method to {{CompletableFuture 
> triggerSavepoint(long timestamp, String targetDirectory)}} to interface
> * Implement method in {{Dispatcher}} and {{JobMaster}}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-8317) Enable triggering of Savepoints via RestfulGateway

2017-12-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8317?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-8317:

Description: Enable triggering of savepoints via RestfulGateway.  (was: 
Enable triggering of savepoints via HTTP:
* It must be possible to trigger savepoints if the Dispatcher is absent (job 
mode)
* )
Summary: Enable triggering of Savepoints via RestfulGateway  (was: 
Expose met triggering of Savepoints via HTTP)

> Enable triggering of Savepoints via RestfulGateway
> --
>
> Key: FLINK-8317
> URL: https://issues.apache.org/jira/browse/FLINK-8317
> Project: Flink
>  Issue Type: New Feature
>  Components: Distributed Coordination, REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Enable triggering of savepoints via RestfulGateway.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8317) Expose met triggering of Savepoints via HTTP

2017-12-24 Thread Gary Yao (JIRA)
Gary Yao created FLINK-8317:
---

 Summary: Expose met triggering of Savepoints via HTTP
 Key: FLINK-8317
 URL: https://issues.apache.org/jira/browse/FLINK-8317
 Project: Flink
  Issue Type: New Feature
  Components: Distributed Coordination, REST
Affects Versions: 1.5.0
Reporter: Gary Yao
 Fix For: 1.5.0


Enable triggering of savepoints via HTTP:
* It must be possible to trigger savepoints if the Dispatcher is absent (job 
mode)
* 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7780) Integrate savepoint command into REST client

2017-12-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-7780.
---
Resolution: Fixed

Re-opened to edit the labels.

> Integrate savepoint command into REST client
> 
>
> Key: FLINK-7780
> URL: https://issues.apache.org/jira/browse/FLINK-7780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, REST
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7780) Integrate savepoint command into REST client

2017-12-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao updated FLINK-7780:

Labels: flip-6  (was: )

> Integrate savepoint command into REST client
> 
>
> Key: FLINK-7780
> URL: https://issues.apache.org/jira/browse/FLINK-7780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, REST
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Reopened] (FLINK-7780) Integrate savepoint command into REST client

2017-12-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao reopened FLINK-7780:
-
  Assignee: Gary Yao  (was: Chesnay Schepler)

> Integrate savepoint command into REST client
> 
>
> Key: FLINK-7780
> URL: https://issues.apache.org/jira/browse/FLINK-7780
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, REST
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Gary Yao
>Priority: Blocker
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-4834) Implement unified High Availability Services Abstraction

2017-12-24 Thread Gary Yao (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302812#comment-16302812
 ] 

Gary Yao commented on FLINK-4834:
-

All subtasks are done. Can this be closed?

> Implement unified High Availability Services Abstraction
> 
>
> Key: FLINK-4834
> URL: https://issues.apache.org/jira/browse/FLINK-4834
> Project: Flink
>  Issue Type: New Feature
>  Components: Cluster Management
> Environment: FLIP-6 feature branch
>Reporter: Stephan Ewen
>  Labels: flip-6
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8231) Extend AbstractRestHandler to return response headers

2017-12-24 Thread Gary Yao (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gary Yao closed FLINK-8231.
---
Resolution: Won't Do

Not needed for now.

> Extend AbstractRestHandler to return response headers
> -
>
> Key: FLINK-8231
> URL: https://issues.apache.org/jira/browse/FLINK-8231
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{AbstractRestHandler}} should be able to return a set of response 
> headers in case that it creates a new resource (POST call) and wants to set 
> the redirection header, for example.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8299) Retrieve ExecutionResult by REST polling

2017-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8299?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302807#comment-16302807
 ] 

ASF GitHub Bot commented on FLINK-8299:
---

GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5207

[FLINK-8299][flip6] Poll JobExecutionResult after job submission

## What is the purpose of the change

*Poll JobExecutionResult after job submission. This is needed, for example, 
to enable `collect()` calls from the job in FLIP-6 mode. This PR is based on 
#5194.*

CC: @tillrohrmann 


## Brief change log

  - *Retrieve JobExecutionResult after job submission in 
`RestClusterClient`*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests for all new classes and changed classes.*
  - *Manually run job in examples/batch/WordCount.jar and verified that the 
results are correctly collected/printed.*
  
## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8299

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5207.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5207


commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults

commit 748745ac3521a20040cbda4056dfd9c53bc24a82
Author: gyao 
Date:   2017-12-20T13:44:03Z

[FLINK-8233][flip6] Add JobExecutionResultHandler

- Allow retrieval of the JobExecutionResult cached in Dispatcher.
- Implement serializer and deserializer for JobExecutionResult.

commit adf091a2770f42d6f8a0c19ab88cc7a208943a32
Author: gyao 
Date:   2017-12-20T13:44:26Z

[hotfix] Clean up ExecutionGraph

- Remove unnecessary throws clause.
- Format whitespace.

commit f5c28527b3a1a0c8ec52f2a5616ebb634397b69c
Author: gyao 
Date:   2017-12-22T23:02:10Z

[FLINK-8299][flip6] Retrieve JobExecutionResult after job submission




> Retrieve ExecutionResult by REST polling
> 
>
> Key: FLINK-8299
> URL: https://issues.apache.org/jira/browse/FLINK-8299
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Retrieve the {{ExecutionResult}} from a finished Flink job via the 
> {{RestClusterClient}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5207: [FLINK-8299][flip6] Poll JobExecutionResult after ...

2017-12-24 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/5207

[FLINK-8299][flip6] Poll JobExecutionResult after job submission

## What is the purpose of the change

*Poll JobExecutionResult after job submission. This is needed, for example, 
to enable `collect()` calls from the job in FLIP-6 mode. This PR is based on 
#5194.*

CC: @tillrohrmann 


## Brief change log

  - *Retrieve JobExecutionResult after job submission in 
`RestClusterClient`*

## Verifying this change

This change added tests and can be verified as follows:

  - *Added unit tests for all new classes and changed classes.*
  - *Manually run job in examples/batch/WordCount.jar and verified that the 
results are correctly collected/printed.*
  
## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-8299

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5207.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5207


commit e91f15fcbbe52d6d47cc1ba3d35ae4768fc6309d
Author: gyao 
Date:   2017-12-19T17:58:53Z

[FLINK-8234][flip6] Cache JobExecutionResult in Dispatcher

- Introduce new JobExecutionResult used by JobMaster to forward the 
information in
  the already existing JobExecutionResult.
- Always cache a JobExecutionResult. Even in case of job failures. In case 
of
  job failures, the serialized exception is stored additionally.
- Introduce new methods to RestfulGateway to allow retrieval of cached
  JobExecutionResults

commit 748745ac3521a20040cbda4056dfd9c53bc24a82
Author: gyao 
Date:   2017-12-20T13:44:03Z

[FLINK-8233][flip6] Add JobExecutionResultHandler

- Allow retrieval of the JobExecutionResult cached in Dispatcher.
- Implement serializer and deserializer for JobExecutionResult.

commit adf091a2770f42d6f8a0c19ab88cc7a208943a32
Author: gyao 
Date:   2017-12-20T13:44:26Z

[hotfix] Clean up ExecutionGraph

- Remove unnecessary throws clause.
- Format whitespace.

commit f5c28527b3a1a0c8ec52f2a5616ebb634397b69c
Author: gyao 
Date:   2017-12-22T23:02:10Z

[FLINK-8299][flip6] Retrieve JobExecutionResult after job submission




---


[jira] [Commented] (FLINK-8301) Support Unicode in codegen for SQL && TableAPI

2017-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302803#comment-16302803
 ] 

ASF GitHub Bot commented on FLINK-8301:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158598514
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
 ---
@@ -352,6 +353,72 @@ class CalcITCase(
 val results = result.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testDeterministicUdfWithUnicodeParameter(): Unit = {
+val data = new mutable.MutableList[(String, String, String)]
+data.+=((null, null, null))
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val udf0 = new LiteralUDF("\"\\", deterministic = true)
+val udf1 = new LiteralUDF("\u0001xyz", deterministic = true)
+val udf2 = new LiteralUDF("\u0001\u0012", deterministic = true)
+
+tEnv.registerFunction("udf0", udf0)
+tEnv.registerFunction("udf1", udf1)
+tEnv.registerFunction("udf2", udf2)
+
+// user have to specify '\' with '\\' in SQL
+val sqlQuery = "SELECT " +
+  "udf0('\"') as str1, " +
+  "udf1('\u0001xyz') as str2, " +
+  "udf2('\u0001\u0012') as str3 from T1"
+
+val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3)
+
+tEnv.registerTable("T1", t1)
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val expected = List("\"\\,\u0001xyz,\u0001\u0012").mkString("\n")
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testNonDeterministicUdfWithUnicodeParameter(): Unit = {
--- End diff --

For reduce IT test time cost, I suggest that merge 
"testDeterministicUdfWithUnicodeParameter" and 
"testNonDeterministicUdfWithUnicodeParameter" in one test case.  
i.e. we create two instance with deterministic value. something as follows:
`
val udf00 = new LiteralUDF("\"\\", deterministic = false)
val udf01 = new LiteralUDF("\"\\", deterministic = true)
...
` 


> Support Unicode in codegen for SQL && TableAPI
> --
>
> Key: FLINK-8301
> URL: https://issues.apache.org/jira/browse/FLINK-8301
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> The current code generation do not support Unicode, "\u0001" will be 
> generated to "\\u0001", function call like concat(str, "\u0001") will lead to 
> wrong result.
> This issue intend to handle char/varchar literal correctly, some examples 
> followed as below.
> literal: '\u0001abc'->   codegen:  "\u0001abc"
> literal: '\u0022\' ->   codegen:  "\"\\"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8301) Support Unicode in codegen for SQL && TableAPI

2017-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302801#comment-16302801
 ] 

ASF GitHub Bot commented on FLINK-8301:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158598528
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
 ---
@@ -541,6 +541,48 @@ class CalcITCase(
   "default-nosharp,Sunny-nosharp,kevin2-nosharp"
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testDeterministicUDFWithUnicodeParameter(): Unit = {
+val data = List(
+  ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+  ("x\u0001y", "y\"z", "z\\\"\u0004z")
+)
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val splitUDF = new SplitUDF(deterministic = true)
+val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+ .select(splitUDF('a, "\u0001", 0) as 'a,
+ splitUDF('b, "\"", 1) as 'b,
+ splitUDF('c, "\\\"\u0004", 0) as 'c
+ )
+val results = ds.collect()
+val expected = List(
+  "a,d,e", "x,z,z"
+).mkString("\n")
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testNonDeterministicUDFWithUnicodeParameter(): Unit = {
+val data = List(
--- End diff --

Same suggest as above.


> Support Unicode in codegen for SQL && TableAPI
> --
>
> Key: FLINK-8301
> URL: https://issues.apache.org/jira/browse/FLINK-8301
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> The current code generation do not support Unicode, "\u0001" will be 
> generated to "\\u0001", function call like concat(str, "\u0001") will lead to 
> wrong result.
> This issue intend to handle char/varchar literal correctly, some examples 
> followed as below.
> literal: '\u0001abc'->   codegen:  "\u0001abc"
> literal: '\u0022\' ->   codegen:  "\"\\"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8301) Support Unicode in codegen for SQL && TableAPI

2017-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302802#comment-16302802
 ] 

ASF GitHub Bot commented on FLINK-8301:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158598552
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -481,4 +484,84 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
   }
 
+  @Test
+  def testDeterministicUdfWithUnicodeParameter(): Unit = {
+val data = new mutable.MutableList[(String, String, String)]
+data.+=((null, null, null))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val udf0 = new LiteralUDF("\"\\", deterministic = true)
+val udf1 = new LiteralUDF("\u0001xyz", deterministic = true)
+val udf2 = new LiteralUDF("\u0001\u0012", deterministic = true)
+
+tEnv.registerFunction("udf0", udf0)
+tEnv.registerFunction("udf1", udf1)
+tEnv.registerFunction("udf2", udf2)
+
+// user have to specify '\' with '\\' in SQL
+val sqlQuery = "SELECT " +
+  "udf0('\"') as str1, " +
+  "udf1('\u0001xyz') as str2, " +
+  "udf2('\u0001\u0012') as str3 from T1"
+
+val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3)
+
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List("\"\\,\u0001xyz,\u0001\u0012")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNonDeterministicUdfWithUnicodeParameter(): Unit = {
+val data = new mutable.MutableList[(String, String, String)]
--- End diff --

Same suggest as above.


> Support Unicode in codegen for SQL && TableAPI
> --
>
> Key: FLINK-8301
> URL: https://issues.apache.org/jira/browse/FLINK-8301
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> The current code generation do not support Unicode, "\u0001" will be 
> generated to "\\u0001", function call like concat(str, "\u0001") will lead to 
> wrong result.
> This issue intend to handle char/varchar literal correctly, some examples 
> followed as below.
> literal: '\u0001abc'->   codegen:  "\u0001abc"
> literal: '\u0022\' ->   codegen:  "\"\\"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8301) Support Unicode in codegen for SQL && TableAPI

2017-12-24 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8301?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16302804#comment-16302804
 ] 

ASF GitHub Bot commented on FLINK-8301:
---

Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158598561
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
 ---
@@ -352,4 +354,64 @@ class CalcITCase extends 
StreamingMultipleProgramsTestBase {
   "{9=Comment#3}")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testDeterministicUDFWithUnicodeParameter(): Unit = {
+val data = List(
+  ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+  ("x\u0001y", "y\"z", "z\\\"\u0004z")
+)
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+val splitUDF = new SplitUDF(deterministic = true)
+val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+  .select(splitUDF('a, "\u0001", 0) as 'a,
+  splitUDF('b, "\"", 1) as 'b,
+  splitUDF('c, "\\\"\u0004", 0) as 'c
+  )
+val results = ds.toAppendStream[Row]
+results.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+val expected = mutable.MutableList(
+  "a,d,e", "x,z,z"
+)
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNonDeterministicUDFWithUnicodeParameter(): Unit = {
+val data = List(
--- End diff --

Same suggest as above.


> Support Unicode in codegen for SQL && TableAPI
> --
>
> Key: FLINK-8301
> URL: https://issues.apache.org/jira/browse/FLINK-8301
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>
> The current code generation do not support Unicode, "\u0001" will be 
> generated to "\\u0001", function call like concat(str, "\u0001") will lead to 
> wrong result.
> This issue intend to handle char/varchar literal correctly, some examples 
> followed as below.
> literal: '\u0001abc'->   codegen:  "\u0001abc"
> literal: '\u0022\' ->   codegen:  "\"\\"



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158598552
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -481,4 +484,84 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
   }
 
+  @Test
+  def testDeterministicUdfWithUnicodeParameter(): Unit = {
+val data = new mutable.MutableList[(String, String, String)]
+data.+=((null, null, null))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val udf0 = new LiteralUDF("\"\\", deterministic = true)
+val udf1 = new LiteralUDF("\u0001xyz", deterministic = true)
+val udf2 = new LiteralUDF("\u0001\u0012", deterministic = true)
+
+tEnv.registerFunction("udf0", udf0)
+tEnv.registerFunction("udf1", udf1)
+tEnv.registerFunction("udf2", udf2)
+
+// user have to specify '\' with '\\' in SQL
+val sqlQuery = "SELECT " +
+  "udf0('\"') as str1, " +
+  "udf1('\u0001xyz') as str2, " +
+  "udf2('\u0001\u0012') as str3 from T1"
+
+val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3)
+
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List("\"\\,\u0001xyz,\u0001\u0012")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNonDeterministicUdfWithUnicodeParameter(): Unit = {
+val data = new mutable.MutableList[(String, String, String)]
--- End diff --

Same suggest as above.


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158598528
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
 ---
@@ -541,6 +541,48 @@ class CalcITCase(
   "default-nosharp,Sunny-nosharp,kevin2-nosharp"
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testDeterministicUDFWithUnicodeParameter(): Unit = {
+val data = List(
+  ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+  ("x\u0001y", "y\"z", "z\\\"\u0004z")
+)
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val splitUDF = new SplitUDF(deterministic = true)
+val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+ .select(splitUDF('a, "\u0001", 0) as 'a,
+ splitUDF('b, "\"", 1) as 'b,
+ splitUDF('c, "\\\"\u0004", 0) as 'c
+ )
+val results = ds.collect()
+val expected = List(
+  "a,d,e", "x,z,z"
+).mkString("\n")
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testNonDeterministicUDFWithUnicodeParameter(): Unit = {
+val data = List(
--- End diff --

Same suggest as above.


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158598561
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
 ---
@@ -352,4 +354,64 @@ class CalcITCase extends 
StreamingMultipleProgramsTestBase {
   "{9=Comment#3}")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testDeterministicUDFWithUnicodeParameter(): Unit = {
+val data = List(
+  ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+  ("x\u0001y", "y\"z", "z\\\"\u0004z")
+)
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+val splitUDF = new SplitUDF(deterministic = true)
+val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+  .select(splitUDF('a, "\u0001", 0) as 'a,
+  splitUDF('b, "\"", 1) as 'b,
+  splitUDF('c, "\\\"\u0004", 0) as 'c
+  )
+val results = ds.toAppendStream[Row]
+results.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+val expected = mutable.MutableList(
+  "a,d,e", "x,z,z"
+)
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNonDeterministicUDFWithUnicodeParameter(): Unit = {
+val data = List(
--- End diff --

Same suggest as above.


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158598514
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
 ---
@@ -352,6 +353,72 @@ class CalcITCase(
 val results = result.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testDeterministicUdfWithUnicodeParameter(): Unit = {
+val data = new mutable.MutableList[(String, String, String)]
+data.+=((null, null, null))
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val udf0 = new LiteralUDF("\"\\", deterministic = true)
+val udf1 = new LiteralUDF("\u0001xyz", deterministic = true)
+val udf2 = new LiteralUDF("\u0001\u0012", deterministic = true)
+
+tEnv.registerFunction("udf0", udf0)
+tEnv.registerFunction("udf1", udf1)
+tEnv.registerFunction("udf2", udf2)
+
+// user have to specify '\' with '\\' in SQL
+val sqlQuery = "SELECT " +
+  "udf0('\"') as str1, " +
+  "udf1('\u0001xyz') as str2, " +
+  "udf2('\u0001\u0012') as str3 from T1"
+
+val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3)
+
+tEnv.registerTable("T1", t1)
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val expected = List("\"\\,\u0001xyz,\u0001\u0012").mkString("\n")
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testNonDeterministicUdfWithUnicodeParameter(): Unit = {
--- End diff --

For reduce IT test time cost, I suggest that merge 
"testDeterministicUdfWithUnicodeParameter" and 
"testNonDeterministicUdfWithUnicodeParameter" in one test case.  
i.e. we create two instance with deterministic value. something as follows:
`
val udf00 = new LiteralUDF("\"\\", deterministic = false)
val udf01 = new LiteralUDF("\"\\", deterministic = true)
...
` 


---