[ 
https://issues.apache.org/jira/browse/FLINK-7599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16703171#comment-16703171
 ] 

ASF GitHub Bot commented on FLINK-7599:
---------------------------------------

twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237450569
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##########
 @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  /**
+    * This query checks:
+    *
+    * 1. count(D.price) produces 0, because no rows matched to D
+    * 2. sum(D.price) produces null, because no rows matched to D
+    * 3. aggregates that take multiple parameters work
+    * 4. aggregates with expressions work
+    */
+  @Test
+  def testCepAggregates(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setMaxGeneratedCodeLength(1)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, String, Long, Double, Int)]
+    data.+=((1, "a", 1, 0.8, 1))
+    data.+=((2, "z", 2, 0.8, 3))
+    data.+=((3, "b", 1, 0.8, 2))
+    data.+=((4, "c", 1, 0.8, 5))
+    data.+=((5, "d", 4, 0.1, 5))
+    data.+=((6, "a", 2, 1.5, 2))
+    data.+=((7, "b", 2, 0.8, 3))
+    data.+=((8, "c", 1, 0.8, 2))
+    data.+=((9, "h", 2, 0.8, 3))
+
+    val t = env.fromCollection(data)
+      .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+    tEnv.registerFunction("weightedAvg", new WeightedAvg)
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM MyTable
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    FIRST(id) as startId,
+         |    SUM(A.price) AS sumA,
+         |    COUNT(DISTINCT D.price) AS countD,
+         |    SUM(D.price) as sumD,
+         |    weightedAvg(price, weight) as wAvg,
+         |    AVG(B.price) AS avgB,
+         |    SUM(B.price * B.rate) as sumExprB,
+         |    LAST(id) as endId
+         |  AFTER MATCH SKIP PAST LAST ROW
+         |  PATTERN (A+ B+ C D? E )
+         |  DEFINE
+         |    A AS SUM(A.price) < 6,
+         |    B AS SUM(B.price * B.rate) < SUM(A.price) AND
+         |         SUM(B.price * B.rate) > 0.2 AND
+         |         SUM(B.price) >= 1 AND
+         |         AVG(B.price) >= 1 AND
+         |         weightedAvg(price, weight) > 1
+         |) AS T
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8")
+    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
+  }
+
+  @Test
+  def testCepAggregatesWithNullInputs(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setMaxGeneratedCodeLength(1)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[Row]
+    data.+=(Row.of(1:java.lang.Integer, "a", 10:java.lang.Integer))
 
 Review comment:
   nit: hint you can also use `Int.box()`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Support aggregation functions in the define and measures clause of 
> MatchRecognize
> ---------------------------------------------------------------------------------
>
>                 Key: FLINK-7599
>                 URL: https://issues.apache.org/jira/browse/FLINK-7599
>             Project: Flink
>          Issue Type: Sub-task
>          Components: CEP, Table API &amp; SQL
>            Reporter: Dian Fu
>            Assignee: Dawid Wysakowicz
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to