Hi Tony, Currently, the functionality that you described does not exist in the consumer. When a topic is deleted, as far as I know, the consumer would simply consider the partitions as unreachable and continue to try fetching records from them until they are up again. I'm not entirely sure if a removed topic is distinguishable from a temporarily out-of-service partition due to Kafka brokers being down in the Kafka API, may need to take a look.
As for the "workaround" that you are using at the moment, you can actually use `KeyedDeserializationSchema#isEndOfStream` for that. When that returns true and the source subtask closes, the Long.MAX_VALUE watermark will be emitted. Cheers, Gordon On Tue, Sep 5, 2017 at 2:50 PM, Tony Wei <tony19920...@gmail.com> wrote: > Hi, > > I have a simple streaming job consuming data from Kafka and use time > window to aggregate them. > I am wondering if there is a built-in function to send a max watermark > when consumer find this topic is not available, so that the window function > can flush all state to the sink function. > > My Kafka version is 0.10.x. Currently, the workaround to me is using > `TimestampAssigner` to check a specific record as termination message, and > make the watermark be Long.MAX_VALUE. > I will send this message to all partitions before I remove that topic. > > I would appreciate if anyone has some suggestions. Thank you. > > Best Regards, > Tony Wei >