[
https://issues.apache.org/jira/browse/FLINK-24767?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18035149#comment-18035149
]
Jacob Jona Fahlenkamp commented on FLINK-24767:
-----------------------------------------------
Hello we have also run into this issue, while debugging why our program works
correctly in STREAMING mode, but not in BATCH mode. We frequently have events
with the same key and timestamp which are correctly ordered in the input, but
get scrambled by the unstable sort. This causes our KeyedProcessFunction to
receive events in an unexpected order. It would be great to have the option to
use a stable sort. As it stands, this unfortunately makes BATCH execution mode
with sort shuffle unusable for us.
> A keyBy following countWindow does not preserve order within the same
> partition
> -------------------------------------------------------------------------------
>
> Key: FLINK-24767
> URL: https://issues.apache.org/jira/browse/FLINK-24767
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Network
> Affects Versions: 1.13.3
> Reporter: Lee Y S
> Priority: Major
>
> I wrote a simple test of the countWindow method (in Kotlin) as below
> {code:java}
> import org.apache.flink.api.common.RuntimeExecutionMode
> import org.apache.flink.api.common.eventtime.WatermarkStrategy
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
> import kotlin.random.Randomobject
> CountWindowTest {
> @JvmStatic
> fun main(args: Array<String>) {
> val env = StreamExecutionEnvironment.getExecutionEnvironment()
> env.setRuntimeMode(RuntimeExecutionMode.BATCH)
> val rand = Random(0)
> val data = (0..1000).map { Pair(rand.nextInt(10), it) }
> env.fromCollection(data).assignTimestampsAndWatermarks(
> WatermarkStrategy.forMonotonousTimestamps<Pair<Int, Int>>()
> .withTimestampAssigner { e, _ -> e.second.toLong() })
> .keyBy { it.first }
> .countWindow(3L, 1)
> .reduce { a, b -> b }
> .keyBy { it.first }
> .filter { it.first == 5 }
> .print()
> env.execute()
> }
> }
> {code}
> The beginning of the output is as below
> 12> (5, 184)
> 12> (5, 18)
> 12> (5, 29)
> 12> (5, 37)
> 12> (5, 38)
> 12> (5, 112)
> 12> (5, 131)
> The first line (5, 184) is not in order from the rest.
> The problem disappears if I remove the keyBy after the reduce or use stream
> mode instead of batch mode.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)