[ https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17444280#comment-17444280 ]
Guowei Ma commented on FLINK-24767: ----------------------------------- I think it might be that in the Batch execution mode FLINK will "sort" the Key and this might be an unstable sort. > A keyBy following countWindow does not preserve order within the same > partition > ------------------------------------------------------------------------------- > > Key: FLINK-24767 > URL: https://issues.apache.org/jira/browse/FLINK-24767 > Project: Flink > Issue Type: Bug > Components: Runtime / Network > Affects Versions: 1.13.3 > Reporter: Lee Y S > Priority: Major > > I wrote a simple test of the countWindow method (in Kotlin) as below > {code:java} > import org.apache.flink.api.common.RuntimeExecutionMode > import org.apache.flink.api.common.eventtime.WatermarkStrategy > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment > import kotlin.random.Randomobject > CountWindowTest { > @JvmStatic > fun main(args: Array<String>) { > val env = StreamExecutionEnvironment.getExecutionEnvironment() > env.setRuntimeMode(RuntimeExecutionMode.BATCH) > val rand = Random(0) > val data = (0..1000).map { Pair(rand.nextInt(10), it) } > env.fromCollection(data).assignTimestampsAndWatermarks( > WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>() > .withTimestampAssigner { e, _ -> e.second.toLong() }) > .keyBy { it.first } > .countWindow(3L, 1) > .reduce { a, b -> b } > .keyBy { it.first } > .filter { it.first == 5 } > .print() > env.execute() > } > } > {code} > The beginning of the output is as below > 12> (5, 184) > 12> (5, 18) > 12> (5, 29) > 12> (5, 37) > 12> (5, 38) > 12> (5, 112) > 12> (5, 131) > The first line (5, 184) is not in order from the rest. > The problem disappears if I remove the keyBy after the reduce or use stream > mode instead of batch mode. > -- This message was sent by Atlassian Jira (v8.20.1#820001)