Hequn Cheng created FLINK-13446:
-----------------------------------

             Summary: Row count sliding window outputs incorrectly in blink 
planner
                 Key: FLINK-13446
                 URL: https://issues.apache.org/jira/browse/FLINK-13446
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.9.0
            Reporter: Hequn Cheng


For blink planner, the Row count sliding window outputs incorrectly. The window 
assigner assigns less window than what expected. This means the window outputs 
fewer data. The bug can be reproduced by the following test:
{code:java}
  @Test
  def testGroupWindowWithoutKeyInProjection(): Unit = {
    val data = List(
      (1L, 1, "Hi", 1, 1),
      (2L, 2, "Hello", 2, 2),
      (4L, 2, "Hello", 2, 2),
      (8L, 3, "Hello world", 3, 3),
      (16L, 3, "Hello world", 3, 3))

    val stream = failingDataSource(data)
    val table = stream.toTable(tEnv, 'long, 'int, 'string, 'int2, 'int3, 
'proctime.proctime)

    val weightAvgFun = new WeightedAvg
    val countDistinct = new CountDistinct

    val windowedTable = table
      .window(Slide over 2.rows every 1.rows on 'proctime as 'w)
      .groupBy('w, 'int2, 'int3, 'string)
      .select(weightAvgFun('long, 'int), countDistinct('long))

    val sink = new TestingAppendSink
    windowedTable.toAppendStream[Row].addSink(sink)
    env.execute()

    val expected = Seq("12,2", "8,1", "2,1", "3,2", "1,1")
    assertEquals(expected.sorted, sink.getAppendResults.sorted)
  }
{code}
The expected output is Seq("12,2", "8,1", "2,1", "3,2", "1,1") while the actual 
output is Seq("12,2", "3,2")

To fix the problem, we can correct the assign logic in 
CountSlidingWindowAssigner.assignWindows.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to