twalthr commented on a change in pull request #7189: [FLINK-10597][table] Enabled UDFs support in MATCH_RECOGNIZE URL: https://github.com/apache/flink/pull/7189#discussion_r237402448
########## File path: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala ########## @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends StreamingWithStateTestBase { // We do not assert the proctime in the result, cause it is currently // accessed from System.currentTimeMillis(), so there is no graceful way to assert the proctime } + + @Test + def testRichUdfs(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setParallelism(1) + val tEnv = TableEnvironment.getTableEnvironment(env) + tEnv.getConfig.setMaxGeneratedCodeLength(1) + StreamITCase.clear + + val data = new mutable.MutableList[(Int, String, Long)] + data.+=((1, "a", 1)) + data.+=((2, "a", 1)) + data.+=((3, "a", 1)) + data.+=((4, "a", 1)) + data.+=((5, "a", 1)) + data.+=((6, "b", 1)) + data.+=((7, "a", 1)) + data.+=((8, "a", 1)) + data.+=((9, "f", 1)) + + val t = env.fromCollection(data) + .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime) + tEnv.registerTable("MyTable", t) + tEnv.registerFunction("prefix", new RichScalarFunc) + val prefix = "PREF" + UserDefinedFunctionTestUtils + .setJobParameters(env, Map("prefix" -> prefix)) + + val sqlQuery = + s""" + |SELECT * + |FROM MyTable + |MATCH_RECOGNIZE ( + | ORDER BY proctime + | MEASURES + | FIRST(id) as firstId, + | prefix(A.name) as prefixedNameA, + | LAST(id) as lastId + | AFTER MATCH SKIP PAST LAST ROW + | PATTERN (A+ C) + | DEFINE + | A AS prefix(A.name) = '$prefix:a' + |) AS T + |""".stripMargin + + val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] + result.addSink(new StreamITCase.StringSink[Row]) + env.execute() + + val expected = mutable.MutableList("1,PREF:a,6", "7,PREF:a,9") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } } class ToMillis extends ScalarFunction { def eval(t: Timestamp): Long = { t.toInstant.toEpochMilli + TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli) } } + +private class RichScalarFunc extends ScalarFunction { Review comment: Give more meaningful name. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services