[ 
https://issues.apache.org/jira/browse/FLINK-10597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16702900#comment-16702900
 ] 

ASF GitHub Bot commented on FLINK-10597:
----------------------------------------

twalthr commented on a change in pull request #7189: [FLINK-10597][table] 
Enabled UDFs support in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7189#discussion_r237402642
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##########
 @@ -543,10 +543,79 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
     // We do not assert the proctime in the result, cause it is currently
     // accessed from System.currentTimeMillis(), so there is no graceful way 
to assert the proctime
   }
+
+  @Test
+  def testRichUdfs(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setMaxGeneratedCodeLength(1)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, String, Long)]
+    data.+=((1, "a", 1))
+    data.+=((2, "a", 1))
+    data.+=((3, "a", 1))
+    data.+=((4, "a", 1))
+    data.+=((5, "a", 1))
+    data.+=((6, "b", 1))
+    data.+=((7, "a", 1))
+    data.+=((8, "a", 1))
+    data.+=((9, "f", 1))
+
+    val t = env.fromCollection(data)
+      .toTable(tEnv, 'id, 'name, 'price, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+    tEnv.registerFunction("prefix", new RichScalarFunc)
+    val prefix = "PREF"
+    UserDefinedFunctionTestUtils
+      .setJobParameters(env, Map("prefix" -> prefix))
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM MyTable
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    FIRST(id) as firstId,
+         |    prefix(A.name) as prefixedNameA,
+         |    LAST(id) as lastId
+         |  AFTER MATCH SKIP PAST LAST ROW
+         |  PATTERN (A+ C)
+         |  DEFINE
+         |    A AS prefix(A.name) = '$prefix:a'
+         |) AS T
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("1,PREF:a,6", "7,PREF:a,9")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
 }
 
 class ToMillis extends ScalarFunction {
   def eval(t: Timestamp): Long = {
     t.toInstant.toEpochMilli + 
TimeZone.getDefault.getOffset(t.toInstant.toEpochMilli)
   }
 }
+
+private class RichScalarFunc extends ScalarFunction {
+
+  private var prefix = "ERROR_VALUE"
+
+  override def open(context: FunctionContext): Unit = {
+    prefix = context.getJobParameter("prefix", "")
+  }
+
+  def eval(value: String): String = {
+    s"$prefix:$value"
+  }
+
+  override def close(): Unit = {
+    prefix = "ERROR_VALUE"
 
 Review comment:
   This line has no meaning so far. Remove the entire method?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable UDFs support in MATCH_RECOGNIZE
> --------------------------------------
>
>                 Key: FLINK-10597
>                 URL: https://issues.apache.org/jira/browse/FLINK-10597
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP, Table API & SQL
>            Reporter: Dawid Wysakowicz
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to