Hi

I did not run your program directly, but I see that you are now using the
Batch execution mode. I suspect it is related to this, because in the Batch
execution mode FLINK will "sort" the Key (this might be an unstable sort).
So would you like to experiment with the results of running with Streaming
mode and to see what happens?

Best,
Guowei


On Wed, Nov 3, 2021 at 12:16 AM Yan Shen <leey...@gmail.com> wrote:

> Hi all,
>
> Can anyone advise on this?
>
> I wrote a simple test of the countWindow method (in Kotlin) as below
>
> package aero.airlab.flinkjobs.headingreminder
>
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.eventtime.WatermarkStrategy
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import kotlin.random.Random
>
> object CountWindowTest {
>     @JvmStatic
>     fun main(args: Array<String>) {
>         val env = StreamExecutionEnvironment.getExecutionEnvironment()
>         env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>
>         val rand = Random(0)
>         val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>         env.fromCollection(data).assignTimestampsAndWatermarks(
>             WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
>                 .withTimestampAssigner { e, _ -> e.second.toLong() })
>             .keyBy { it.first }
>             .countWindow(3L, 1)
>             .reduce { a, b -> b }
>             .keyBy { it.first }
>             .filter { it.first == 5 }
>             .print()
>
>         env.execute()
>     }
> }
>
>
> The beginning of the output is as such
>
> 12> (5, 184)
> 12> (5, 18)
> 12> (5, 29)
> 12> (5, 37)
> 12> (5, 38)
> 12> (5, 112)
> 12> (5, 131)
>
> The first line (5, 184) is not in order from the rest.
>
> Is this a bug? The problem disappears if I remove the keyBy after the
> reduce.
>
> Thanks.
>

Reply via email to