Looking back the email thread I think one of the comments from Mayuresh was the 
question about needing KIP for this change or not as the KafkaConsumer does not 
guarantee the end user any order, and so no changes to the contracts to users.

I entered KIP based on suggestions from the attached email when going through 
code contribution process. I am not sure what to do next in this KIP process. 
Could anyone please help/advise me on what to do next?

Thanks!

CH

-----Original Message-----
From: ChienHsing Wu <chien...@opentext.com> 
Sent: Wednesday, December 12, 2018 1:05 PM
To: dev@kafka.apache.org
Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption 
Across Partitions in KafkaConsumer

Good to know that, Thanks! 

Nonetheless, that introduces additional complexity at the client side for a 
common expectation to more or less receives records in a fair fashion.

CH

-----Original Message-----
From: Mayuresh Gharat <gharatmayures...@gmail.com>
Sent: Wednesday, December 12, 2018 12:55 PM
To: dev@kafka.apache.org
Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message Consumption 
Across Partitions in KafkaConsumer

Hi ChienHsing,

We are actually working on buffering the already fetched data for paused 
topicPartitions, so ideally it should not have any effect on performance.
Associated jira : 
https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_KAFKA-2D7548&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=_ERDVQqqt9Grnxt7DDO_gC9CvpD_ylhH8ZoHLwSXEpU&e=

Thanks,

Mayuresh

On Wed, Dec 12, 2018 at 6:01 AM ChienHsing Wu <chien...@opentext.com> wrote:

> Hi Mayuresh,
>
> Thanks for the input!
>
> Pausing and Resuming are cumbersome and has some undesirable 
> performance impact since pausing will in effect clean up the completed 
> fetch and resuming will call the broker to retrieve again.
>
> The way I changed the code was just to parse the completed fetch 
> earlier and ensure the order to retrieve are the same as the completed fetch 
> queue.
> I did make code changes to take into account the following in Fetcher class.
>
> 1) exception handling
> 2) ensure the parsed partitions are not included in 
> fetchablePartitions
> 3) clear buffer when not in the newly assigned partitions in 
> clearBufferedDataForUnassignedPartitions
> 4) close them properly in close method
>
> Though the consumer does not guarantee explicit order, KIP 41 (link
> below) did intend to ensure fair distribution and therefore the round 
> robin algorithm in the code. The change I propose was to enhance it.
>
>
> https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_
> confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecords
> -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=DwIF
> aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3
> V7I&m=7eC1W-f8nKkXMGJti3n0zF4qDV0af8y5uOWVIftTJ-U&s=NKZHA5HVggfKWlF_yg
> 6V3-Wyf_Z6x7n1HQPQ1_M0d9A&e=
>
> As for performance, the changes does not add any additional calls to 
> the broker nor does it introduce significant processing logic; it just 
> parses the completed fetch earlier and have a list to manage them.
>
>
> CH
>
> -----Original Message-----
> From: Mayuresh Gharat <gharatmayures...@gmail.com>
> Sent: Tuesday, December 11, 2018 6:58 PM
> To: dev@kafka.apache.org
> Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> Consumption Across Partitions in KafkaConsumer
>
> Hi ChienHsing,
>
> The other way I was thinking, this can be done outside of 
> KafkaConsumer is by pausing and resuming TopicPartitions (may be in round 
> robin fashion).
> There is some gotcha there as in you might not know if the consumer 
> has already fetched data for the remaining partitions.
> Also I am not sure, if we need a KIP for this as the KafkaConsumer 
> does not guarantee the end user, any order, I believe. So if this 
> change goes in, I don't think its changing the underlying behavior.
> It would be good to check if this change will impact the performance 
> of the consumer.
>
> Thanks,
>
> Mayuresh
>
>
> On Tue, Dec 11, 2018 at 11:03 AM ChienHsing Wu <chien...@opentext.com>
> wrote:
>
> > Hi Mayuresh,
> >
> > To serve one poll call the logic greedily gets records from one 
> > completed fetch before including records from the next completed 
> > fetch from the queue, as you described.
> >
> > The algorithm remembers the current completed fetch as starting one 
> > when serving the next poll call. The net effect is that completed 
> > fetch will be retrieved to serve as many poll calls before 
> > retrieving records from any other completed fetches.
> >
> > For example, let's say the consumer has been assigned partition A, B 
> > and C and the max.poll.records is set to 100. Right now we have 
> > completed fetch A, and B. Each one has 300 records. It will take 6 
> > poll calls to retrieve all record and the sequence of retrieved 
> > partitions will be: A, A, A, B, B, B.
> >
> > Ideally, it should alternate between A and B. I was proposing to 
> > move to the next one fetch for the next poll call based on the order 
> > in the completed fetch queue, so the order becomes A, B, A, B, A, B.
> > The implementation parses the completed fetch only once.
> >
> > Thanks, CH
> >
> > -----Original Message-----
> > From: Mayuresh Gharat <gharatmayures...@gmail.com>
> > Sent: Tuesday, December 11, 2018 1:21 PM
> > To: dev@kafka.apache.org
> > Subject: Re: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > Consumption Across Partitions in KafkaConsumer
> >
> > Hi ChienHsing,
> >
> > Thanks for the KIP.
> > It would be great if you can explain with an example, what you mean by "
> > Currently the implementation will return available records starting 
> > from the last partition the last poll call retrieves records from.
> > This leads to unfair patterns of record consumption from multiple
> partitions."
> >
> > KafkaConsumer would send fetch requests to multiple brokers and then 
> > gets the corresponding responses and puts them in to a single queue 
> > of CompletedFetches. IT then iterates over these completed fetches 
> > queue and peels of number of records = max.poll.records from each 
> > completedFetch for each poll() before moving on to next 
> > completedFetch. Also it does not send a fetch request for a 
> > TopicPartition, if we already have a buffered data (completedFetch 
> > or
> > nextInlineRecord) for that TopicPartition. It also moves the 
> > TopicPartition to the end of the assignment queue, once it has 
> > received data from broker for that TopicPartition, to maintain round
> robin fetch sequence for fairness.
> >
> > Thanks,
> >
> > Mayuresh
> >
> > On Tue, Dec 11, 2018 at 9:13 AM ChienHsing Wu 
> > <chien...@opentext.com>
> > wrote:
> >
> > > Jason,
> > >
> > >
> > >
> > > KIP 41 was initiated by you and this KIP is to change the logic 
> > > discussed in the Ensure Fair Consumption<
> > >
> > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.or
> > g_
> > confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecor
> > ds
> > -23KIP-2D41-3AKafkaConsumerMaxRecords-2DEnsuringFairConsumption&d=Dw
> > IF
> > aQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6R
> > A3
> > V7I&m=jeijHrRehjaysSML7ZSVlVEepS5LWchozwVVbwp7TLA&s=warXH2nttWvhdQhn
> > -o
> > SZuBYfZ_V2OY5ikbksVMzbt9o&e=
> > >.
> > > Your input on KIP-387<
> > >
> > >https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.o
> > >rg
> > >_
> > >confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsumpt
> > >io
> > >n
> > >-2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf2P1
> > >-X
> > >D
> > >AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=jeijHrRehja
> > >ys
> > >S
> > >ML7ZSVlVEepS5LWchozwVVbwp7TLA&s=Ptfb85HFvz0TqKSju21-_uV-U_0_HHNlnNf
> > >0k
> > >T
> > > tRlgk&e=>
> > > would be very valuable.
> > >
> > >
> > >
> > > Thanks, ChienHsing
> > >
> > >
> > >
> > > -----Original Message-----
> > > From: ChienHsing Wu <chien...@opentext.com>
> > > Sent: Tuesday, December 04, 2018 11:43 AM
> > > To: dev@kafka.apache.org
> > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > >
> > >
> > > Hi,
> > >
> > >
> > >
> > > Any comments/updates? I am not sure the next steps if no one has 
> > > any further comments.
> > >
> > >
> > >
> > > Thanks, CH
> > >
> > >
> > >
> > > -----Original Message-----
> > >
> > > From: ChienHsing Wu
> > > <chien...@opentext.com<mailto:chien...@opentext.com>>
> > >
> > > Sent: Tuesday, November 20, 2018 2:46 PM
> > >
> > > To: dev@kafka.apache.org<mailto:dev@kafka.apache.org>
> > >
> > > Subject: RE: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > >
> > >
> > > Hi Matt,
> > >
> > >
> > >
> > > Thanks for the feedback.
> > >
> > >
> > >
> > > The issue with the current design is that it stays on the previous 
> > > partition even if the last poll call consumes the 
> > > max.poll.records; it will consume all records in that partition 
> > > available at the consumer side to serve multiple poll calls before 
> > > moving to the next
> partition.
> > >
> > >
> > >
> > > Introducing another threshold at partition level will decrease the 
> > > number of records consumed in one partition within one poll call 
> > > but will still use that same partition as the starting one in the 
> > > next poll
> > call.
> > >
> > >
> > >
> > > The same effect can be achieved by setting max.poll.records to 100 
> > > I believe. The main difference is that the client will need to 
> > > make more poll calls when that value is set to 100, and because of 
> > > the non-blocking nature I believe the cost of extra poll calls are 
> > > not
> > significant.
> > >
> > >
> > >
> > > Further thoughts?
> > >
> > >
> > >
> > > Thanks, CH
> > >
> > >
> > >
> > > -----Original Message-----
> > >
> > > From: Matt Farmer <m...@frmr.me<mailto:m...@frmr.me>>
> > >
> > > Sent: Monday, November 19, 2018 9:32 PM
> > >
> > > To: dev@kafka.apache.org<mailto:dev@kafka.apache.org>
> > >
> > > Subject: [EXTERNAL] - Re: [DISCUSS] KIP-387: Fair Message 
> > > Consumption Across Partitions in KafkaConsumer
> > >
> > >
> > >
> > > Hi there,
> > >
> > >
> > >
> > > Thanks for the KIP.
> > >
> > >
> > >
> > > We’ve run into issues with this at Mailchimp so something to 
> > > address consuming behavior would save us from having to always 
> > > ensure we’re running enough consumers that each consumer has only 
> > > one partition (which is our usual MO).
> > >
> > >
> > >
> > > I wonder though if it would be simpler and more powerful to define 
> > > the maximum number of records the consumer should pull from one 
> > > partition before pulling some records from another?
> > >
> > >
> > >
> > > So if you set max.poll.records to 500 and then some new setting, 
> > > max.poll.records.per.partition, to 100 then the Consumer would 
> > > switch what partition it reads from every 100 records - looping 
> > > back around to the first partition that had records if there 
> > > aren’t 5 or more partitions with records.
> > >
> > >
> > >
> > > What do you think?
> > >
> > >
> > >
> > > On Mon, Nov 19, 2018 at 9:11 AM ChienHsing Wu 
> > > <chien...@opentext.com <mailto:chien...@opentext.com>> wrote:
> > >
> > >
> > >
> > > > Hi, could anyone please review this KIP?
> > >
> > > >
> > >
> > > > Thanks, ChienHsing
> > >
> > > >
> > >
> > > > From: ChienHsing Wu
> > >
> > > > Sent: Friday, November 09, 2018 1:10 PM
> > >
> > > > To: dev@kafka.apache.org<mailto:dev@kafka.apache.org>
> > >
> > > > Subject: RE: [DISCUSS] KIP-387: Fair Message Consumption Across
> > >
> > > > Partitions in KafkaConsumer
> > >
> > > >
> > >
> > > > Just to check: Will anyone review this? It's been silent for a
> week...
> > >
> > > > Thanks, ChienHsing
> > >
> > > >
> > >
> > > > From: ChienHsing Wu
> > >
> > > > Sent: Monday, November 05, 2018 4:18 PM
> > >
> > > > To: 'dev@kafka.apache.org' <dev@kafka.apache.org<mailto:
> > >
> > > > dev@kafka.apache.org<mailto:dev@kafka.apache.org>>>
> > >
> > > > Subject: [DISCUSS] KIP-387: Fair Message Consumption Across 
> > > > Partitions
> > >
> > > > in KafkaConsumer
> > >
> > > >
> > >
> > > > Hi I just put together the KIP page as requested. This email is 
> > > > to
> > >
> > > > start the discussion thread.
> > >
> > > >
> > >
> > > > KIP: KIP-387: Fair Message Consumption Across Partitions in
> > >
> > > > KafkaConsumer<
> > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.
> > > > or
> > > > g_
> > >
> > > > confluence_display_KAFKA_KIP-2D387-253A-2BFair-2BMessage-2BConsu
> > > > mp
> > > > ti
> > > > on
> > >
> > > > -2BAcross-2BPartitions-2Bin-2BKafkaConsumer&d=DwIFaQ&c=ZgVRmm3mf
> > > > 2P
> > > > 1-
> > > > XD
> > >
> > > > AyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcr
> > > > k8
> > > > Xi
> > > > oE
> > >
> > > > m16n75UIKYwi8c8YrzVrp5tBK7LX8&s=gBGG4GvzPu-xhQ-uUqlq30U-bzwcKZ_l
> > > > NP
> > > > 1b
> > > > F5
> > >
> > > > 49_KU&e=
> > >
> > > > >
> > >
> > > > Pull Request:
> > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__github.com_
> > > > ap
> > > > ac
> > > > he
> > >
> > > > _kafka_pull_5838&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9T
> > > > oL
> > > > W0
> > > > OF
> > >
> > > > yo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi8c8YrzVrp
> > > > 5t
> > > > BK
> > > > 7L
> > >
> > > > X8&s=cJ2JGXAUQx4ymtMv_MLtGq7QiUJV3xBzKcS_Nwla08A&e=
> > >
> > > > Jira:
> > >
> > > > https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apac
> > > > he
> > > > .o
> > > > rg
> > >
> > > > _jira_browse_KAFKA-2D3932&d=DwIFaQ&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az
> > > > 03
> > > > wM
> > > > rb
> > >
> > > > L9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HhHPjfcrk8XioEm16n75UIKYwi
> > > > 8c
> > > > 8Y
> > > > rz
> > >
> > > > Vrp5tBK7LX8&s=TfIIF2Ui9YEVxxwAbko0j-fT_mMVHf5Yywapc0w8eEA&e=
> > >
> > > >
> > >
> > > > Thanks, CH
> > >
> > > >
> > >
> >
> >
> > --
> > -Regards,
> > Mayuresh R. Gharat
> > (862) 250-7125
> >
>
>
> --
> -Regards,
> Mayuresh R. Gharat
> (862) 250-7125
>


--
-Regards,
Mayuresh R. Gharat
(862) 250-7125
--- Begin Message ---
I added you to the list of contributes. You can now self-assign ticket
to yourself.

Before you start working on this, we need to understand what the actual
issue is in detail. Note, that sending fetch request to partitions and
what poll() returns is two different things by design.

You might also want to read
https://cwiki.apache.org/confluence/display/KAFKA/KIP-227%3A+Introduce+Incremental+FetchRequests+to+Increase+Partition+Scalability

I am not sure, if KAFKA-3923 requests to change the fetch request logic
(what was done already) or what poll() returns from the buffer, or maybe
both. It would be good to ask for clarification on the ticket to see
what the reported actually means, and if the current behavior meets
there requirements and if not, why.

Overall, this change seems to require a KIP anyway. Hope this helps.


-Matthias


On 10/30/18 9:38 AM, ChienHsing Wu wrote:
> I just looked at the release schedule. I guess the 2.2 is around Feb/2019, 
> right?  --CH
> 
> -----Original Message-----
> From: ChienHsing Wu <chien...@opentext.com> 
> Sent: Tuesday, October 30, 2018 10:56 AM
> To: dev@kafka.apache.org
> Subject: RE: [EXTERNAL] - Re: KAFKA-3932 - Consumer fails to consume in a 
> round robin fashion
> 
> Hi Matthias,
> 
> Sorry about the late reply.
> 
> I have a Jira account. It's chienhsw. I am using the latest version 2.0. 
> Would it be possible to add that to 2.0 as a minor release?
> 
> Thanks, ChienHsing
> 
> -----Original Message-----
> From: Matthias J. Sax <matth...@confluent.io>
> Sent: Wednesday, October 24, 2018 6:41 PM
> To: dev@kafka.apache.org
> Subject: [EXTERNAL] - Re: KAFKA-3932 - Consumer fails to consume in a round 
> robin fashion
> 
> CH,
> 
> Thanks for contributing to Kafka. Do you have a Jira account already? If yes, 
> what is your account id? If not, you need to create one first and share your 
> id so we can grant permission to self-assign tickets.
> 
> I was just looking into the ticket itself, and it's marked as 0.10.0.0.
> You say you encountered this issues. Do you use 0.10.0.x version? AFAIK, the 
> consumer was updated in later versions, and the behavior should be different. 
> Before you start working on the ticket, we should verify that it is not 
> already fixed. For this case, we would just resolve the ticket with 
> corresponding fixed version.
> 
> Note, that the behavior (at least from my point of view) is not a bug, but 
> addressing it would be an improvement. Thus, if you work on it, the patch 
> would be released with 2.2.0 version, but _not_ with a potential
> 0.10.0.2 release.
> 
> Does this make sense?
> 
> 
> -Matthias
> 
> On 10/24/18 6:27 AM, ChienHsing Wu wrote:
>> I don't see any comments/concerns. I would like to implement and commit to 
>> this ticket. Could anyone let me know how to request for the permission to 
>> assign that ticket to me?
>>
>> Thanks, CH
>>
>> From: ChienHsing Wu
>> Sent: Monday, October 22, 2018 1:40 PM
>> To: 'dev@kafka.apache.org' <dev@kafka.apache.org>
>> Subject: KAFKA-3932 - Consumer fails to consume in a round robin 
>> fashion
>>
>>
>> Hi,
>>
>>
>>
>> I encountered the issue documented in the jira 
>> KAFKA-3932<https://urldefense.proofpoint.com/v2/url?u=https-3A__issues.apache.org_jira_browse_KAFKA-2D3932-3Fjql-3Dtext-2520-7E-2520-2522round-2520robin-2520consumer-2522&d=DwIFAg&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HLTuFMPfB0s5o_WTleQx1c5YRdKxDWwoJRC4iDkPopw&s=9KsjjzejGA5jiySmpE3WR0wqAoOKfZhju-8dUhZZoD4&e=>.
>>  Upon studying the source code and the 
>> PIP<https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_KAFKA_KIP-2D41-253A-2BKafkaConsumer-2BMax-2BRecords&d=DwIFAg&c=ZgVRmm3mf2P1-XDAyDsu4A&r=Az03wMrbL9ToLW0OFyo3wo3985rhAKPMLmmg6RA3V7I&m=HLTuFMPfB0s5o_WTleQx1c5YRdKxDWwoJRC4iDkPopw&s=L7JVo7fEjkgHrA0jN5SUmCIrxlQuIRWCxG_iiBD-zdw&e=>,
>>  I think the issues is the statement in PIP: "As before, we'd keep track of 
>> which partition we left off at so that the next iteration would begin 
>> there." I think it should NOT use the last partition in the next iteration; 
>> it should pick the next one instead.
>>
>> If this behavior is agreeable, the simplest solution to impose the order to 
>> pick the next one is to use the order the consumer.internals.Fetcher 
>> receives the partition messages, as determined by completedFetches queue in 
>> that class. To avoid parsing the partition messages repeatedly. we can save 
>> those parsed fetches to a list and maintain the next partition to get 
>> messages there.
>>
>> Does it sound like a good approach? If this is not the right place to 
>> discuss the design please let me know where to engage. If this is agreeable 
>> I can contribute the implementation.
>>
>>
>>
>> Thanks, CH
>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature


--- End Message ---

Reply via email to