Github user sunjincheng121 commented on a diff in the pull request: https://github.com/apache/flink/pull/5203#discussion_r158598561 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/CalcITCase.scala --- @@ -352,4 +354,64 @@ class CalcITCase extends StreamingMultipleProgramsTestBase { "{9=Comment#3}") assertEquals(expected.sorted, StreamITCase.testResults.sorted) } + + @Test + def testDeterministicUDFWithUnicodeParameter(): Unit = { + val data = List( + ("a\u0001b", "c\"d", "e\\\"\u0004f"), + ("x\u0001y", "y\"z", "z\\\"\u0004z") + ) + val env = StreamExecutionEnvironment.getExecutionEnvironment + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + val splitUDF = new SplitUDF(deterministic = true) + val ds = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c) + .select(splitUDF('a, "\u0001", 0) as 'a, + splitUDF('b, "\"", 1) as 'b, + splitUDF('c, "\\\"\u0004", 0) as 'c + ) + val results = ds.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink[Row]) + env.execute() + val expected = mutable.MutableList( + "a,d,e", "x,z,z" + ) + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testNonDeterministicUDFWithUnicodeParameter(): Unit = { + val data = List( --- End diff -- Same suggest as above.
---