twalthr commented on a change in pull request #7177: [FLINK-7599] [table] 
Support for aggregates in MATCH_RECOGNIZE
URL: https://github.com/apache/flink/pull/7177#discussion_r237442180
 
 

 ##########
 File path: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/MatchRecognizeITCase.scala
 ##########
 @@ -464,6 +467,129 @@ class MatchRecognizeITCase extends 
StreamingWithStateTestBase {
     assertEquals(expected.sorted, StreamITCase.testResults.sorted)
   }
 
+  /**
+    * This query checks:
+    *
+    * 1. count(D.price) produces 0, because no rows matched to D
+    * 2. sum(D.price) produces null, because no rows matched to D
+    * 3. aggregates that take multiple parameters work
+    * 4. aggregates with expressions work
+    */
+  @Test
+  def testCepAggregates(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    env.setParallelism(1)
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+    tEnv.getConfig.setMaxGeneratedCodeLength(1)
+    StreamITCase.clear
+
+    val data = new mutable.MutableList[(Int, String, Long, Double, Int)]
+    data.+=((1, "a", 1, 0.8, 1))
+    data.+=((2, "z", 2, 0.8, 3))
+    data.+=((3, "b", 1, 0.8, 2))
+    data.+=((4, "c", 1, 0.8, 5))
+    data.+=((5, "d", 4, 0.1, 5))
+    data.+=((6, "a", 2, 1.5, 2))
+    data.+=((7, "b", 2, 0.8, 3))
+    data.+=((8, "c", 1, 0.8, 2))
+    data.+=((9, "h", 2, 0.8, 3))
+
+    val t = env.fromCollection(data)
+      .toTable(tEnv, 'id, 'name, 'price, 'rate, 'weight, 'proctime.proctime)
+    tEnv.registerTable("MyTable", t)
+    tEnv.registerFunction("weightedAvg", new WeightedAvg)
+
+    val sqlQuery =
+      s"""
+         |SELECT *
+         |FROM MyTable
+         |MATCH_RECOGNIZE (
+         |  ORDER BY proctime
+         |  MEASURES
+         |    FIRST(id) as startId,
+         |    SUM(A.price) AS sumA,
+         |    COUNT(DISTINCT D.price) AS countD,
+         |    SUM(D.price) as sumD,
+         |    weightedAvg(price, weight) as wAvg,
+         |    AVG(B.price) AS avgB,
+         |    SUM(B.price * B.rate) as sumExprB,
+         |    LAST(id) as endId
+         |  AFTER MATCH SKIP PAST LAST ROW
+         |  PATTERN (A+ B+ C D? E )
+         |  DEFINE
+         |    A AS SUM(A.price) < 6,
+         |    B AS SUM(B.price * B.rate) < SUM(A.price) AND
+         |         SUM(B.price * B.rate) > 0.2 AND
+         |         SUM(B.price) >= 1 AND
+         |         AVG(B.price) >= 1 AND
+         |         weightedAvg(price, weight) > 1
+         |) AS T
+         |""".stripMargin
+
+    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    result.addSink(new StreamITCase.StringSink[Row])
+    env.execute()
+
+    val expected = mutable.MutableList("1,5,0,null,2,3,3.4,8")
 
 Review comment:
   Produce a second row to be on the safe side?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to