Hi David,

If there are a fewer splits than subtasks, then in streaming mode, the
watermark will not advance automatically anymore. If the subtask later
picks up a new split (topic discovery), then the watermark will be set to
the respective data.
If the user uses higher parallelism than there are splits (that's not
recommended at all for static assignments), they need to use idleness to
express explicitly that they expect the watermark to progress anyways.
Flink simply cannot infer that piece of information. It tried in the past,
but invalidated the dynamic topic discovery use case while doing so.

We could change KafkaSubscriber to express that it is indeed static
assignment and then we could close the source subtasks that are unneeded.
But then use cases like topic recreation with additional partitions will
not work anymore.

On Mon, Nov 22, 2021 at 11:53 AM David Anderson <dander...@apache.org>
wrote:

> Thanks, Arvid.
>
> Can you clarify how the KafkaSource currently behaves in the situation
> where it starts with fewer partitions than subtasks? Is that the case
> described in FLIP-180 as case 1: "Static assignment + too few splits"? The
> implementation described there (emit MAX_WATERMARK) should yield equivalent
> behavior to the old source -- but that doesn't seem to be what folks are
> experiencing. Or is this case 3: "Dynamic assignment"?
>
> As for where to explain all this, I think somewhere near the paragraph [1]
> explaining how to migrate from FlinkKafkaConsumer would be appropriate.
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/release-notes/flink-1.14/#deprecate-flinkkafkaconsumer
>
> On Mon, Nov 22, 2021 at 11:24 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi David,
>>
>> yes that's intentionally [1] as it could lead to correctness issues and
>> it was inconsistently used across sources. Yes it should be documented.
>>
>> For now I'd put it in the KafkaSource docs because I'm not sure in which
>> release notes it would fit best. In which release notes would you expect
>> such a disclaimer?
>>
>> [1]
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-180%3A+Adjust+StreamStatus+and+Idleness+definition
>>
>> On Mon, Nov 22, 2021 at 10:37 AM David Anderson <dander...@apache.org>
>> wrote:
>>
>>> I've seen a few questions recently from folks migrating from
>>> FlinkKafkaConsumer to KafkaSource that make me suspect that something has
>>> changed.
>>>
>>> In FlinkKafkaConsumerBase we have this code which sets a source subtask
>>> to idle if all of its partitions are empty when the subtask starts:
>>>
>>>         // mark the subtask as temporarily idle if there are no initial
>>> seed partitions;
>>>         // once this subtask discovers some partitions and starts
>>> collecting records, the subtask's
>>>         // status will automatically be triggered back to be active.
>>>         if (subscribedPartitionsToStartOffsets.isEmpty()) {
>>>             sourceContext.markAsTemporarilyIdle();
>>>         }
>>>
>>> Unsurprisingly, people have code that depends on this behavior, and
>>> after switching to KafkaSource, their tests or applications are failing to
>>> produce results (because the idle partitions are now holding back the
>>> watermarks). This leads me to believe that KafkaSource does not work the
>>> same way.
>>>
>>> Can someone confirm that the behavior here has changed?
>>>
>>> Was this intentional? Yes, one can use withIdleness to achieve something
>>> similar, but if this is now required, it needs to be documented in the
>>> release notes, etc.
>>>
>>> David
>>>
>>

Reply via email to