haishui created FLINK-36751:
-------------------------------
Summary: PausableRelativeClock does not pause when the source only
has one split
Key: FLINK-36751
URL: https://issues.apache.org/jira/browse/FLINK-36751
Project: Flink
Issue Type: Sub-task
Components: API / DataStream
Reporter: haishui
Reason:
PausableRelativeClock#pause is called at pauseOrResumeSplits in
ProgressiveTimestampsAndWatermarks/SourceOperator, which is only called when
the sourceOperator has more than one splits.
My example code tested on Flink 1.20-SNAPSHOT is as follows:
{code:java}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataGeneratorSource<Long> dataGen1 = new DataGeneratorSource<>(v -> v,
Long.MAX_VALUE, RateLimiterStrategy.perSecond(30), Types.LONG);
DataGeneratorSource<Long> dataGen2 = new DataGeneratorSource<>(v -> v + 500,
Long.MAX_VALUE, RateLimiterStrategy.perSecond(1), Types.LONG);
WatermarkStrategy<Long> watermarkStrategy = WatermarkStrategy
.<Long>forMonotonousTimestamps()
.withTimestampAssigner((aLong, l) -> aLong)
.withWatermarkAlignment("default", Duration.ofMillis(5),
Duration.ofSeconds(5))
.withIdleness(Duration.ofSeconds(5));
DataStreamSource<Long> s1 = env.fromSource(dataGen1, watermarkStrategy, "S1");
DataStream<Long> s2 = env.fromSource(dataGen2, watermarkStrategy, "S2");
s1.print("S1");
s2.print("S2");
s1.keyBy(v -> 0)
.connect(s2.keyBy(v -> 0))
.process(new CoProcessFunction<Long, Long, Void>() {
@Override
public void processElement1(Long aLong, CoProcessFunction<Long,
Long, Void>.Context context, Collector<Void> collector) throws Exception {
if (context.timestamp() <
context.timerService().currentWatermark()) {
throw new IllegalStateException("left stream element is
late: " + aLong);
}
} @Override
public void processElement2(Long aLong, CoProcessFunction<Long,
Long, Void>.Context context, Collector<Void> collector) throws Exception {
if (context.timestamp() <
context.timerService().currentWatermark()) {
throw new IllegalStateException("right stream element is
late: " + aLong);
}
}
});
env.execute();{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)