[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5203


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158598552
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -481,4 +484,84 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
   }
 
+  @Test
+  def testDeterministicUdfWithUnicodeParameter(): Unit = {
+val data = new mutable.MutableList[(String, String, String)]
+data.+=((null, null, null))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val udf0 = new LiteralUDF("\"\\", deterministic = true)
+val udf1 = new LiteralUDF("\u0001xyz", deterministic = true)
+val udf2 = new LiteralUDF("\u0001\u0012", deterministic = true)
+
+tEnv.registerFunction("udf0", udf0)
+tEnv.registerFunction("udf1", udf1)
+tEnv.registerFunction("udf2", udf2)
+
+// user have to specify '\' with '\\' in SQL
+val sqlQuery = "SELECT " +
+  "udf0('\"') as str1, " +
+  "udf1('\u0001xyz') as str2, " +
+  "udf2('\u0001\u0012') as str3 from T1"
+
+val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3)
+
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List("\"\\,\u0001xyz,\u0001\u0012")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNonDeterministicUdfWithUnicodeParameter(): Unit = {
+val data = new mutable.MutableList[(String, String, String)]
--- End diff --

Same suggest as above.


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158598528
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/table/CalcITCase.scala
 ---
@@ -541,6 +541,48 @@ class CalcITCase(
   "default-nosharp,Sunny-nosharp,kevin2-nosharp"
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testDeterministicUDFWithUnicodeParameter(): Unit = {
+val data = List(
+  ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+  ("x\u0001y", "y\"z", "z\\\"\u0004z")
+)
+val env = ExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val splitUDF = new SplitUDF(deterministic = true)
+val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+ .select(splitUDF('a, "\u0001", 0) as 'a,
+ splitUDF('b, "\"", 1) as 'b,
+ splitUDF('c, "\\\"\u0004", 0) as 'c
+ )
+val results = ds.collect()
+val expected = List(
+  "a,d,e", "x,z,z"
+).mkString("\n")
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testNonDeterministicUDFWithUnicodeParameter(): Unit = {
+val data = List(
--- End diff --

Same suggest as above.


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158598561
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
 ---
@@ -352,4 +354,64 @@ class CalcITCase extends 
StreamingMultipleProgramsTestBase {
   "{9=Comment#3}")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testDeterministicUDFWithUnicodeParameter(): Unit = {
+val data = List(
+  ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+  ("x\u0001y", "y\"z", "z\\\"\u0004z")
+)
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+val splitUDF = new SplitUDF(deterministic = true)
+val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+  .select(splitUDF('a, "\u0001", 0) as 'a,
+  splitUDF('b, "\"", 1) as 'b,
+  splitUDF('c, "\\\"\u0004", 0) as 'c
+  )
+val results = ds.toAppendStream[Row]
+results.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+val expected = mutable.MutableList(
+  "a,d,e", "x,z,z"
+)
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testNonDeterministicUDFWithUnicodeParameter(): Unit = {
+val data = List(
--- End diff --

Same suggest as above.


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-24 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158598514
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/CalcITCase.scala
 ---
@@ -352,6 +353,72 @@ class CalcITCase(
 val results = result.toDataSet[Row].collect()
 TestBaseUtils.compareResultAsText(results.asJava, expected)
   }
+
+  @Test
+  def testDeterministicUdfWithUnicodeParameter(): Unit = {
+val data = new mutable.MutableList[(String, String, String)]
+data.+=((null, null, null))
+
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val tEnv = TableEnvironment.getTableEnvironment(env)
+
+val udf0 = new LiteralUDF("\"\\", deterministic = true)
+val udf1 = new LiteralUDF("\u0001xyz", deterministic = true)
+val udf2 = new LiteralUDF("\u0001\u0012", deterministic = true)
+
+tEnv.registerFunction("udf0", udf0)
+tEnv.registerFunction("udf1", udf1)
+tEnv.registerFunction("udf2", udf2)
+
+// user have to specify '\' with '\\' in SQL
+val sqlQuery = "SELECT " +
+  "udf0('\"') as str1, " +
+  "udf1('\u0001xyz') as str2, " +
+  "udf2('\u0001\u0012') as str3 from T1"
+
+val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3)
+
+tEnv.registerTable("T1", t1)
+
+val results = tEnv.sql(sqlQuery).toDataSet[Row].collect()
+
+val expected = List("\"\\,\u0001xyz,\u0001\u0012").mkString("\n")
+TestBaseUtils.compareResultAsText(results.asJava, expected)
+  }
+
+  @Test
+  def testNonDeterministicUdfWithUnicodeParameter(): Unit = {
--- End diff --

For reduce IT test time cost, I suggest that merge 
"testDeterministicUdfWithUnicodeParameter" and 
"testNonDeterministicUdfWithUnicodeParameter" in one test case.  
i.e. we create two instance with deterministic value. something as follows:
`
val udf00 = new LiteralUDF("\"\\", deterministic = false)
val udf01 = new LiteralUDF("\"\\", deterministic = true)
...
` 


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-23 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158582395
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -481,4 +484,48 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
   }
 
+  @Test
+  def testUnicodeParameter(): Unit = {
+val data = new mutable.MutableList[(String, String, String)]
+data.+=((null, null, null)) //""
--- End diff --

Remove useless comment or say more detail about this test row data.


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-23 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158582801
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
 ---
@@ -352,4 +354,38 @@ class CalcITCase extends 
StreamingMultipleProgramsTestBase {
   "{9=Comment#3}")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testUDFWithUnicodeParameter(): Unit = {
+val data = List(
+  ("a\u0001b", "c\"d", "e\\\"\u0004f"),
+  ("x\u0001y", "y\"z", "z\\\"\u0004z")
+)
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.testResults = mutable.MutableList()
+val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+  .select(SplitUDF('a, "\u0001", 0) as 'a,
+  SplitUDF('b, "\"", 1) as 'b,
+  SplitUDF('c, "\\\"\u0004", 0) as 'c
+  )
+val results = ds.toAppendStream[Row]
+results.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+val expected = mutable.MutableList(
+  "a,d,e", "x,z,z"
+)
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+}
+
+object SplitUDF extends ScalarFunction {
+  def eval(x: String, sep: String, index: Int): String = {
+val splits = StringUtils.splitByWholeSeparator(x, sep)
--- End diff --

Add test case of isDeterministic: Boolean = false.


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-23 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158582778
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -481,4 +484,48 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
   }
 
+  @Test
+  def testUnicodeParameter(): Unit = {
--- End diff --

Add batch test case as well. 


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-23 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158582780
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala
 ---
@@ -352,4 +354,38 @@ class CalcITCase extends 
StreamingMultipleProgramsTestBase {
   "{9=Comment#3}")
 assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testUDFWithUnicodeParameter(): Unit = {
--- End diff --

Add batch test case as well. 


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-23 Thread sunjincheng121
Github user sunjincheng121 commented on a diff in the pull request:

https://github.com/apache/flink/pull/5203#discussion_r158582750
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -481,4 +484,48 @@ class SqlITCase extends StreamingWithStateTestBase {
 assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
   }
 
+  @Test
+  def testUnicodeParameter(): Unit = {
+val data = new mutable.MutableList[(String, String, String)]
+data.+=((null, null, null)) //""
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+val tEnv = TableEnvironment.getTableEnvironment(env)
+StreamITCase.clear
+
+val udf0 = new LiteralUDF("\"\\")
+val udf1 = new LiteralUDF("\u0001xyz")
+val udf2 = new LiteralUDF("\u0001\u0012")
+
+tEnv.registerFunction("udf0", udf0)
+tEnv.registerFunction("udf1", udf1)
+tEnv.registerFunction("udf2", udf2)
+
+// user have to specify '\' with '\\' in SQL
+val sqlQuery = "SELECT " +
+  "udf0('\"') as str1, " +
+  "udf1('\u0001xyz') as str2, " +
+  "udf2('\u0001\u0012') as str3 from T1"
+
+val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3)
+
+tEnv.registerTable("T1", t1)
+
+val result = tEnv.sql(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List("\"\\,\u0001xyz,\u0001\u0012")
+assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+}
+
+class LiteralUDF(constant: String) extends ScalarFunction {
+  def eval(x: String): String = {
+assert(x == constant)
+x
+  }
+  override def isDeterministic: Boolean = false
--- End diff --

I think we should add the case of isDeterministic=true.  I think Expression 
Reduce we do the different processing logic.


---


[GitHub] flink pull request #5203: [FLINK-8301] Support Unicode in codegen for SQL &&...

2017-12-21 Thread Xpray
GitHub user Xpray opened a pull request:

https://github.com/apache/flink/pull/5203

[FLINK-8301] Support Unicode in codegen for  SQL && TableAPI





## What is the purpose of the change

*support unicode literal in sql and handles code generation correctly


## Brief change log

  - *SQL && TableAPI has different literals if using unicode. After sql 
parse, the literal is "\\u0001" with length = 6 but TableAPI get "\u0001" with 
length = 1
  - *before generating code, unescape first to make \u in one character 
, and escape to generate a valid Java String.
  - *the literal '\u' from TableAPI has already been an one character 
String, it needs escaping before code generation
 - *so in SQL path, a literal needs unescape and escape, in TableAPI path a 
literal needs escape first and join the same path with SQL.


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test for both SQL && TableAPI with unicode parameter*
 no

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): 
no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: 
no
  - The serializers: 
no
  - The runtime per-record code paths (performance sensitive): 
no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: 
no
  - The S3 file system connector: 
no

## Documentation

  - Does this pull request introduce a new feature? no


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Xpray/flink FLINK-8301

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5203.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5203


commit 945d997c612b1048d600adabb73bf16b637c7f2b
Author: Xpray 
Date:   2017-12-22T02:09:01Z

[FLINK-8301] Support Unicode in codegen for TableAPI && SQL




---