WatermarkStrategy.withIdleness works by marking idle streams as idle, so
that downstream operators will ignore those streams and allow the
watermarks to progress based only on the advancement of the watermarks of
the still active streams. As you suspected, this mechanism does not provide
for the watermark to be advanced in situations where all of the streams are
idle.

If your goal is ensure that all of the events are processed and all
event-time timers are fired (and all event-time windows are closed) before
a job ends, Flink already includes a mechanism for this purpose. If you are
using a bounded source, then when that source reaches the end of its input,
a final Watermark of value Watermark.MAX_WATERMARK will be automatically
emitted. The --drain option, as in

./bin/flink stop --drain <job-id>

also has this effect [1].

With a Kafka source, you can arrange for this to happen by having your
kafka deserializer return true from its isEndOfStream() method. Or you
could use the new KafkaSource connector included in Flink 1.12 with
its setBounded option.

On the other hand, if you really did need to advance the watermark despite
a (possibly temporary) total lack of events, you could implement a
watermark strategy that artificially advances the watermark based on the
passage of processing time. You'll find an example in [2], though it hasn't
been updated to use the new watermark strategy interface.

Regards,
David

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/cli.html#stopping-a-job-gracefully-creating-a-final-savepoint
[2]
https://github.com/aljoscha/flink/blob/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/timestamps/ProcessingTimeTrailingBoundedOutOfOrdernessTimestampExtractor.java

On Fri, Mar 12, 2021 at 9:47 AM Dan Hill <quietgol...@gmail.com> wrote:

> I haven't been able to get WatermarkStrategy.withIdleness to work.  Is it
> broken?  None of my timers trigger when I'd expect idleness to take over.
>
> On Tue, Mar 2, 2021 at 11:15 PM Dan Hill <quietgol...@gmail.com> wrote:
>
>> Hi.
>>
>> For local and tests development, I want to flush the events in my system
>> to make sure I'm processing everything.  My watermark does not progress to
>> finish all of the data.
>>
>> What's the best practice for local development or tests?
>>
>> If I use idle sources for 1 Kafka partition, this appears broken.  I'm
>> guessing there is logic to prevent removing an idle partition if it's the
>> only partition.  Is there a version of this I can enable for local
>> development that supports 1 partition?
>>
>> I see this tech talk.  Are there other talks to watch?
>> https://www.youtube.com/watch?v=bQmz7JOmE_4&feature=youtu.be
>>
>> Do I need to write my own watermark generator?  Or change my test data to
>> have a way of generating watermarks?
>>
>> I've tried a few variants of the following source code.  The watermark
>> doesn't progress in the operator right after creating the source.
>>
>> SingleOutputStreamOperator<T> viewInput = env.addSource(...)
>>         .uid("source-view")
>>         .assignTimestampsAndWatermarks(
>>
>> WatermarkStrategy.<T>forBoundedOutOfOrderness(Duration.ofSeconds(1)).withIdleness(Duration.ofMinutes(1)));
>>
>

Reply via email to