Hi all,

Can anyone advise on this?

I wrote a simple test of the countWindow method (in Kotlin) as below

package aero.airlab.flinkjobs.headingreminder

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import kotlin.random.Random

object CountWindowTest {
    @JvmStatic
    fun main(args: Array<String>) {
        val env = StreamExecutionEnvironment.getExecutionEnvironment()
        env.setRuntimeMode(RuntimeExecutionMode.BATCH)

        val rand = Random(0)
        val data = (0..1000).map { Pair(rand.nextInt(10), it) }
        env.fromCollection(data).assignTimestampsAndWatermarks(
            WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
                .withTimestampAssigner { e, _ -> e.second.toLong() })
            .keyBy { it.first }
            .countWindow(3L, 1)
            .reduce { a, b -> b }
            .keyBy { it.first }
            .filter { it.first == 5 }
            .print()

        env.execute()
    }
}


The beginning of the output is as such

12> (5, 184)
12> (5, 18)
12> (5, 29)
12> (5, 37)
12> (5, 38)
12> (5, 112)
12> (5, 131)

The first line (5, 184) is not in order from the rest.

Is this a bug? The problem disappears if I remove the keyBy after the
reduce.

Thanks.

Reply via email to