[
https://issues.apache.org/jira/browse/FLINK-36751?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17901589#comment-17901589
]
Roman Khachatryan commented on FLINK-36751:
-------------------------------------------
Merged into master as 0c338ae6d21e6d3dc79764e7801cbb7adfe6a44f
into 1.20 as 98045aad2c0ace527893bb20a9daa054aa48adbf
into 1.19 as d917462ee6492d83559d9e602a82230a850272a1
> PausableRelativeClock does not pause when the source only has one split
> -----------------------------------------------------------------------
>
> Key: FLINK-36751
> URL: https://issues.apache.org/jira/browse/FLINK-36751
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataStream
> Affects Versions: 1.20.0, 1.19.1, 2.0-preview
> Reporter: haishui
> Assignee: haishui
> Priority: Major
> Labels: pull-request-available
> Fix For: 2.0.0, 1.19.2, 1.20.1
>
>
> Reason:
> PausableRelativeClock#pause is called at pauseOrResumeSplits in
> ProgressiveTimestampsAndWatermarks/SourceOperator, which is only called when
> the sourceOperator has more than one splits.
>
> My example code tested on Flink 1.20-SNAPSHOT is as follows:
> {code:java}
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setParallelism(1);
> DataGeneratorSource<Long> dataGen1 = new DataGeneratorSource<>(v -> v,
> Long.MAX_VALUE, RateLimiterStrategy.perSecond(30), Types.LONG);
> DataGeneratorSource<Long> dataGen2 = new DataGeneratorSource<>(v -> v + 500,
> Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.LONG);
> WatermarkStrategy<Long> watermarkStrategy = WatermarkStrategy
> .<Long>forMonotonousTimestamps()
> .withTimestampAssigner((aLong, l) -> aLong)
> .withWatermarkAlignment("default", Duration.ofMillis(5),
> Duration.ofSeconds(5))
> .withIdleness(Duration.ofSeconds(5));
> DataStreamSource<Long> s1 = env.fromSource(dataGen1, watermarkStrategy, "S1");
> DataStream<Long> s2 = env.fromSource(dataGen2, watermarkStrategy, "S2");
> s1.print("S1");
> s2.print("S2");
> s1.keyBy(v -> 0)
> .connect(s2.keyBy(v -> 0))
> .process(new CoProcessFunction<Long, Long, Void>() {
> @Override
> public void processElement1(Long aLong, CoProcessFunction<Long,
> Long, Void>.Context context, Collector<Void> collector) throws Exception {
> if (context.timestamp() <
> context.timerService().currentWatermark()) {
> throw new IllegalStateException("left stream element is
> late: " + aLong);
> }
> } @Override
> public void processElement2(Long aLong, CoProcessFunction<Long,
> Long, Void>.Context context, Collector<Void> collector) throws Exception {
> if (context.timestamp() <
> context.timerService().currentWatermark()) {
> throw new IllegalStateException("right stream element is
> late: " + aLong);
> }
> }
> });
> env.execute();{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)