I've seen a few questions recently from folks migrating from
FlinkKafkaConsumer to KafkaSource that make me suspect that something has
changed.

In FlinkKafkaConsumerBase we have this code which sets a source subtask to
idle if all of its partitions are empty when the subtask starts:

        // mark the subtask as temporarily idle if there are no initial
seed partitions;
        // once this subtask discovers some partitions and starts
collecting records, the subtask's
        // status will automatically be triggered back to be active.
        if (subscribedPartitionsToStartOffsets.isEmpty()) {
            sourceContext.markAsTemporarilyIdle();
        }

Unsurprisingly, people have code that depends on this behavior, and after
switching to KafkaSource, their tests or applications are failing to
produce results (because the idle partitions are now holding back the
watermarks). This leads me to believe that KafkaSource does not work the
same way.

Can someone confirm that the behavior here has changed?

Was this intentional? Yes, one can use withIdleness to achieve something
similar, but if this is now required, it needs to be documented in the
release notes, etc.

David

Reply via email to