Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5555#discussion_r183867019
  
    --- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
    @@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
         (8L, 8, "Hello World"),
         (20L, 20, "Hello World"))
     
    +  @Test
    +  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setParallelism(1)
    +    StreamITCase.clear
    +
    +    val t = StreamTestData.get5TupleDataStream(env)
    +      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val sqlQuery = "SELECT a, " +
    +      "  SUM(DISTINCT e) OVER (" +
    +      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
    +      "  MIN(DISTINCT e) OVER (" +
    +      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
    +      "  COLLECT(DISTINCT e) OVER (" +
    +      "    PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
    +      "FROM MyTable"
    +
    +    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
    +    result.addSink(new StreamITCase.StringSink[Row])
    +    env.execute()
    +
    +    val expected = List(
    +      "1,1,1,{1=1}",
    +      "2,2,2,{2=1}",
    +      "2,3,1,{1=1, 2=1}",
    +      "3,2,2,{2=1}",
    +      "3,2,2,{2=1}",
    +      "3,5,2,{2=1, 3=1}",
    +      "4,2,2,{2=1}",
    +      "4,3,1,{1=1, 2=1}",
    +      "4,3,1,{1=1, 2=1}",
    +      "4,3,1,{1=1, 2=1}",
    +      "5,1,1,{1=1}",
    +      "5,4,1,{1=1, 3=1}",
    +      "5,4,1,{1=1, 3=1}",
    +      "5,6,1,{1=1, 2=1, 3=1}",
    +      "5,5,2,{2=1, 3=1}")
    +    assertEquals(expected, StreamITCase.testResults)
    +  }
    +
    +  @Test
    +  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    env.setParallelism(1)
    +    StreamITCase.clear
    +
    +    val t = StreamTestData.get5TupleDataStream(env)
    +      .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
    +    tEnv.registerTable("MyTable", t)
    +
    +    val sqlQuery = "SELECT a, " +
    +      "  COUNT(e) OVER (" +
    +      "    PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
    +      "  SUM(DISTINCT e) OVER (" +
    +      "    PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
    +      "  MIN(DISTINCT e) OVER (" +
    +      "    PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
    +      "FROM MyTable"
    +
    +    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
    +    result.addSink(new StreamITCase.StringSink[Row])
    +    env.execute()
    +
    +    val expected = List(
    +      "1,1,1,1",
    +      "2,1,2,2",
    +      "2,2,3,1",
    +      "3,1,2,2",
    +      "3,2,2,2",
    +      "3,3,5,2",
    +      "4,1,2,2",
    +      "4,2,3,1",
    +      "4,3,3,1",
    +      "4,4,3,1",
    +      "5,1,1,1",
    +      "5,2,4,1",
    +      "5,3,4,1",
    +      "5,4,6,1",
    +      "5,5,6,1")
    +    assertEquals(expected, StreamITCase.testResults)
    +  }
    +
    +  @Test
    +  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
    --- End diff --
    
    Great, thanks!


---

Reply via email to