Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5203#discussion_r158598552
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
    @@ -481,4 +484,84 @@ class SqlITCase extends StreamingWithStateTestBase {
         assertEquals(expected.sorted, MemoryTableSinkUtil.results.sorted)
       }
     
    +  @Test
    +  def testDeterministicUdfWithUnicodeParameter(): Unit = {
    +    val data = new mutable.MutableList[(String, String, String)]
    +    data.+=((null, null, null))
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.clear
    +
    +    val udf0 = new LiteralUDF("\"\\", deterministic = true)
    +    val udf1 = new LiteralUDF("\u0001xyz", deterministic = true)
    +    val udf2 = new LiteralUDF("\u0001\u0012", deterministic = true)
    +
    +    tEnv.registerFunction("udf0", udf0)
    +    tEnv.registerFunction("udf1", udf1)
    +    tEnv.registerFunction("udf2", udf2)
    +
    +    // user have to specify '\' with '\\' in SQL
    +    val sqlQuery = "SELECT " +
    +      "udf0('\"\\\\') as str1, " +
    +      "udf1('\u0001xyz') as str2, " +
    +      "udf2('\u0001\u0012') as str3 from T1"
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv, 'str1, 'str2, 'str3)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val result = tEnv.sql(sqlQuery).toAppendStream[Row]
    +    result.addSink(new StreamITCase.StringSink[Row])
    +    env.execute()
    +
    +    val expected = List("\"\\,\u0001xyz,\u0001\u0012")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testNonDeterministicUdfWithUnicodeParameter(): Unit = {
    +    val data = new mutable.MutableList[(String, String, String)]
    --- End diff --
    
    Same suggest as above.


---

Reply via email to