twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237402448
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##########
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
     // We do not assert the proctime in the result, cause it is currently
     // accessed from System.currentTimeMillis(), so there is no graceful way 
to assert the proctime
   }
+
+  @Test
+  def testRichUdfs(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setMaxGeneratedCodeLength(1)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, String, Long)]
+    data.+=((1, "a", 1))
+    data.+=((2, "a", 1))
+    data.+=((3, "a", 1))
+    data.+=((4, "a", 1))
+    data.+=((5, "a", 1))
+    data.+=((6, "b", 1))
+    data.+=((7, "a", 1))
+    data.+=((8, "a", 1))
+    data.+=((9, "f", 1))
+
+    val t = env.fromCollection(data)
+      .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+    tEnv.registerFunction("prefix", new RichScalarFunc)
+    val prefix = "PREF"
+    UserDefinedFunctionTestUtils
+      .setJobParameters(env, Map("prefix" -> prefix))
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM MyTable
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    FIRST(id) as firstId,
+         |    prefix(A.name) as prefixedNameA,
+         |    LAST(id) as lastId
+         |  AFTER MATCH SKIP PAST LAST ROW
+         |  PATTERN (A+ C)
+         |  DEFINE
+         |    A AS prefix(A.name) = '$prefix:a'
+         |) AS T
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("1,PREF:a,6", "7,PREF:a,9")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 class ToMillis extends ScalarFunction {
   def eval(t: Timestamp): Long = {
     t.toInstant.toEpochMilli + 
TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli)
   }
 }
+
+private class RichScalarFunc extends ScalarFunction {
 
 Review comment:
   Give more meaningful name.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to