Hi, Yan
I do not think it is a bug. Maybe we could not assume the input's order of
an operator simply.
Best,
Guowei


On Wed, Nov 3, 2021 at 3:10 PM Yan Shen <leey...@gmail.com> wrote:

> Yes, it does not happen in streaming mode. Is this considered a bug or is
> it by design?
>
> Thanks!
>
> On Wed, Nov 3, 2021 at 1:58 PM Guowei Ma <guowei....@gmail.com> wrote:
>
>> Hi
>>
>> I did not run your program directly, but I see that you are now using the
>> Batch execution mode. I suspect it is related to this, because in the Batch
>> execution mode FLINK will "sort" the Key (this might be an unstable sort).
>> So would you like to experiment with the results of running with
>> Streaming mode and to see what happens?
>>
>> Best,
>> Guowei
>>
>>
>> On Wed, Nov 3, 2021 at 12:16 AM Yan Shen <leey...@gmail.com> wrote:
>>
>>> Hi all,
>>>
>>> Can anyone advise on this?
>>>
>>> I wrote a simple test of the countWindow method (in Kotlin) as below
>>>
>>> package aero.airlab.flinkjobs.headingreminder
>>>
>>> import org.apache.flink.api.common.RuntimeExecutionMode
>>> import org.apache.flink.api.common.eventtime.WatermarkStrategy
>>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
>>> import kotlin.random.Random
>>>
>>> object CountWindowTest {
>>>     @JvmStatic
>>>     fun main(args: Array<String>) {
>>>         val env = StreamExecutionEnvironment.getExecutionEnvironment()
>>>         env.setRuntimeMode(RuntimeExecutionMode.BATCH)
>>>
>>>         val rand = Random(0)
>>>         val data = (0..1000).map { Pair(rand.nextInt(10), it) }
>>>         env.fromCollection(data).assignTimestampsAndWatermarks(
>>>             WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
>>>                 .withTimestampAssigner { e, _ -> e.second.toLong() })
>>>             .keyBy { it.first }
>>>             .countWindow(3L, 1)
>>>             .reduce { a, b -> b }
>>>             .keyBy { it.first }
>>>             .filter { it.first == 5 }
>>>             .print()
>>>
>>>         env.execute()
>>>     }
>>> }
>>>
>>>
>>> The beginning of the output is as such
>>>
>>> 12> (5, 184)
>>> 12> (5, 18)
>>> 12> (5, 29)
>>> 12> (5, 37)
>>> 12> (5, 38)
>>> 12> (5, 112)
>>> 12> (5, 131)
>>>
>>> The first line (5, 184) is not in order from the rest.
>>>
>>> Is this a bug? The problem disappears if I remove the keyBy after the
>>> reduce.
>>>
>>> Thanks.
>>>
>>

Reply via email to