Thanks, Arvid.

Can you clarify how the KafkaSource currently behaves in the situation
where it starts with fewer partitions than subtasks? Is that the case
described in FLIP-180 as case 1: "Static assignment + too few splits"? The
implementation described there (emit MAX_WATERMARK) should yield equivalent
behavior to the old source -- but that doesn't seem to be what folks are
experiencing. Or is this case 3: "Dynamic assignment"?

As for where to explain all this, I think somewhere near the paragraph [1]
explaining how to migrate from FlinkKafkaConsumer would be appropriate.

[1]
https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer

On Mon, Nov 22, 2021 at 11:24 AM Arvid Heise <ar...@apache.org> wrote:

> Hi David,
>
> yes that's intentionally [1] as it could lead to correctness issues and it
> was inconsistently used across sources. Yes it should be documented.
>
> For now I'd put it in the KafkaSource docs because I'm not sure in which
> release notes it would fit best. In which release notes would you expect
> such a disclaimer?
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>
> On Mon, Nov 22, 2021 at 10:37 AM David Anderson <dander...@apache.org>
> wrote:
>
>> I've seen a few questions recently from folks migrating from
>> FlinkKafkaConsumer to KafkaSource that make me suspect that something has
>> changed.
>>
>> In FlinkKafkaConsumerBase we have this code which sets a source subtask
>> to idle if all of its partitions are empty when the subtask starts:
>>
>>         // mark the subtask as temporarily idle if there are no initial
>> seed partitions;
>>         // once this subtask discovers some partitions and starts
>> collecting records, the subtask's
>>         // status will automatically be triggered back to be active.
>>         if (subscribedPartitionsToStartOffsets.isEmpty()) {
>>             sourceContext.markAsTemporarilyIdle();
>>         }
>>
>> Unsurprisingly, people have code that depends on this behavior, and after
>> switching to KafkaSource, their tests or applications are failing to
>> produce results (because the idle partitions are now holding back the
>> watermarks). This leads me to believe that KafkaSource does not work the
>> same way.
>>
>> Can someone confirm that the behavior here has changed?
>>
>> Was this intentional? Yes, one can use withIdleness to achieve something
>> similar, but if this is now required, it needs to be documented in the
>> release notes, etc.
>>
>> David
>>
>

Reply via email to