Hi Chamikara and Lee,

Thanks for the information, I did more experiment on my local laptop.
(Flink Runner local mode, Job Manager and Task Manager runs in the same JVM)
setup: input topic 4 partitions
1. with 1 parallelism: KafkaIO read will drill 1 partition completed to 0
lags, then move to the another partition
2. with 2 parallelism: KafkaIO read will read 2 partitions together, and
move to the rest of the partitions
3. with 4 parallelism: KafkaIO read will read 4 partitions together.

In production, we run multiple Flink Task managers, from the consumer lag
reported, we also see some partitions goes to 0, while other
partitions remain high lag.

Thanks!
Eleanore

On Mon, May 11, 2020 at 8:19 PM Heejong Lee <heej...@google.com> wrote:

> If we assume that there's only one reader, all partitions are assigned to
> a single KafkaConsumer. I think the order of reading each partition depends
> on KafkaConsumer implementation i.e. how KafkaConsumer.poll() returns
> messages.
>
> Reference:
> assigning partitions:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L83
> polling records:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L538
> creating a record batch:
> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L614
>
> On Mon, May 11, 2020 at 7:54 PM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>> The number of partitions assigned to a given split depends on the
>> desiredNumSplits value provided by the runner.
>>
>> https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedSource.java#L54
>>
>> (This is assuming that you are using Beam Kafka source not a native Flink
>> override).
>>
>> Do you see the same behavior when you increase the number of workers of
>> your Flink cluster ?
>>
>> On Mon, May 11, 2020 at 5:28 PM Eleanore Jin <eleanore....@gmail.com>
>> wrote:
>>
>>> Hi community,
>>>
>>> In my pipeline, I am using KafkaIO to read and write. The source topic
>>> has 4 partitions and pipeline parallelism is 1.
>>>
>>> I noticed from consumer lag metrics, it will consume from 1 partition
>>> until all the messages from that partition is processed then it will
>>> consume from another partition.
>>>
>>> Is this the expected behavior?
>>>
>>> Runner is Flink.
>>>
>>> Thanks a lot!
>>> Eleanore
>>>
>>

Reply via email to