Hi, Yan I do not think it is a bug. Maybe we could not assume the input's order of an operator simply. Best, Guowei
On Wed, Nov 3, 2021 at 3:10 PM Yan Shen <leey...@gmail.com> wrote: > Yes, it does not happen in streaming mode. Is this considered a bug or is > it by design? > > Thanks! > > On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi >> >> I did not run your program directly, but I see that you are now using the >> Batch execution mode. I suspect it is related to this, because in the Batch >> execution mode FLINK will "sort" the Key (this might be an unstable sort). >> So would you like to experiment with the results of running with >> Streaming mode and to see what happens? >> >> Best, >> Guowei >> >> >> On Wed, Nov 3, 2021 at 12:16 AM Yan Shen <leey...@gmail.com> wrote: >> >>> Hi all, >>> >>> Can anyone advise on this? >>> >>> I wrote a simple test of the countWindow method (in Kotlin) as below >>> >>> package aero.airlab.flinkjobs.headingreminder >>> >>> import org.apache.flink.api.common.RuntimeExecutionMode >>> import org.apache.flink.api.common.eventtime.WatermarkStrategy >>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment >>> import kotlin.random.Random >>> >>> object CountWindowTest { >>> @JvmStatic >>> fun main(args: Array<String>) { >>> val env = StreamExecutionEnvironment.getExecutionEnvironment() >>> env.setRuntimeMode(RuntimeExecutionMode.BATCH) >>> >>> val rand = Random(0) >>> val data = (0..1000).map { Pair(rand.nextInt(10), it) } >>> env.fromCollection(data).assignTimestampsAndWatermarks( >>> WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>() >>> .withTimestampAssigner { e, _ -> e.second.toLong() }) >>> .keyBy { it.first } >>> .countWindow(3L, 1) >>> .reduce { a, b -> b } >>> .keyBy { it.first } >>> .filter { it.first == 5 } >>> .print() >>> >>> env.execute() >>> } >>> } >>> >>> >>> The beginning of the output is as such >>> >>> 12> (5, 184) >>> 12> (5, 18) >>> 12> (5, 29) >>> 12> (5, 37) >>> 12> (5, 38) >>> 12> (5, 112) >>> 12> (5, 131) >>> >>> The first line (5, 184) is not in order from the rest. >>> >>> Is this a bug? The problem disappears if I remove the keyBy after the >>> reduce. >>> >>> Thanks. >>> >>