I've seen a few questions recently from folks migrating from FlinkKafkaConsumer to KafkaSource that make me suspect that something has changed.
In FlinkKafkaConsumerBase we have this code which sets a source subtask to idle if all of its partitions are empty when the subtask starts: // mark the subtask as temporarily idle if there are no initial seed partitions; // once this subtask discovers some partitions and starts collecting records, the subtask's // status will automatically be triggered back to be active. if (subscribedPartitionsToStartOffsets.isEmpty()) { sourceContext.markAsTemporarilyIdle(); } Unsurprisingly, people have code that depends on this behavior, and after switching to KafkaSource, their tests or applications are failing to produce results (because the idle partitions are now holding back the watermarks). This leads me to believe that KafkaSource does not work the same way. Can someone confirm that the behavior here has changed? Was this intentional? Yes, one can use withIdleness to achieve something similar, but if this is now required, it needs to be documented in the release notes, etc. David