Thanks Jason. I think it is a good feature to add, +1.

As suggested in KIP-32, we'd better to keep end state of the KIP wiki with
finalized implementation details rather than leaving a list of options. I
agree that for both fairness and pre-fetching the simpler approach would be
sufficient for most of the time. So could we move the other approach to
"rejected"?

Guozhang

On Wed, Jan 6, 2016 at 6:14 PM, Gwen Shapira <g...@confluent.io> wrote:

> I like the fair-consumption approach you chose - "pull as many records as
> possible from each partition in a similar round-robin fashion", it is very
> intuitive and close enough to fair.
>
> Overall, I'm +1 on the KIP. But you'll need a formal vote :)
>
> On Wed, Jan 6, 2016 at 6:05 PM, Jason Gustafson <ja...@confluent.io>
> wrote:
>
> > Thanks for the suggestion, Ismael. I updated the KIP.
> >
> > -Jason
> >
> > On Wed, Jan 6, 2016 at 6:57 AM, Ismael Juma <ism...@juma.me.uk> wrote:
> >
> > > Thanks Jason. I read the KIP and it makes sense to me. A minor
> > suggestion:
> > > in the "Ensuring Fair Consumption" section, there are 3 paragraphs
> with 2
> > > examples (2 partitions with 100 max.poll.records and 3 partitions with
> 30
> > > max.poll.records). I think you could simplify this by using one of the
> > > examples in the 3 paragraphs.
> > >
> > > Ismael
> > >
> > > On Tue, Jan 5, 2016 at 7:32 PM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > I've updated the KIP with some implementation details. I also added
> > more
> > > > discussion on the heartbeat() alternative. The short answer for why
> we
> > > > rejected this API is that it doesn't seem to work well with offset
> > > commits.
> > > > This would tend to make correct usage complicated and difficult to
> > > explain.
> > > > Additionally, we don't see any clear advantages over having a way to
> > set
> > > > the max records. For example, using max.records=1 would be equivalent
> > to
> > > > invoking heartbeat() on each iteration of the message processing
> loop.
> > > >
> > > > Going back to the discussion on whether we should use a configuration
> > > value
> > > > or overload poll(), I'm leaning toward the configuration option
> mainly
> > > for
> > > > compatibility and to keep the KafkaConsumer API from getting any more
> > > > complex. Also, as others have mentioned, it seems reasonable to want
> to
> > > > tune this setting in the same place that the session timeout and
> > > heartbeat
> > > > interval are configured. I still feel a little uncomfortable with the
> > > need
> > > > to do a lot of configuration tuning to get the consumer working for a
> > > > particular environment, but hopefully the defaults are conservative
> > > enough
> > > > that most users won't need to. However, if it remains a problem, then
> > we
> > > > could still look into better options for managing the size of batches
> > > > including overloading poll() with a max records argument or possibly
> by
> > > > implementing a batch scaling algorithm internally.
> > > >
> > > > -Jason
> > > >
> > > >
> > > > On Mon, Jan 4, 2016 at 12:18 PM, Jason Gustafson <ja...@confluent.io
> >
> > > > wrote:
> > > >
> > > > > Hi Cliff,
> > > > >
> > > > > I think we're all agreed that the current contract of poll() should
> > be
> > > > > kept. The consumer wouldn't wait for max messages to become
> available
> > > in
> > > > > this proposal; it would only sure that it never returns more than
> max
> > > > > messages.
> > > > >
> > > > > -Jason
> > > > >
> > > > > On Mon, Jan 4, 2016 at 11:52 AM, Cliff Rhyne <crh...@signal.co>
> > wrote:
> > > > >
> > > > >> Instead of a heartbeat, I'd prefer poll() to return whatever
> > messages
> > > > the
> > > > >> client has.  Either a) I don't care if I get less than my max
> > message
> > > > >> limit
> > > > >> or b) I do care and will set a larger timeout.  Case B is less
> > common
> > > > than
> > > > >> A and is fairly easy to handle in the application's code.
> > > > >>
> > > > >> On Mon, Jan 4, 2016 at 1:47 PM, Gwen Shapira <g...@confluent.io>
> > > wrote:
> > > > >>
> > > > >> > 1. Agree that TCP window style scaling will be cool. I'll try to
> > > think
> > > > >> of a
> > > > >> > good excuse to use it ;)
> > > > >> >
> > > > >> > 2. I'm very concerned about the challenges of getting the
> > timeouts,
> > > > >> > hearbeats and max messages right.
> > > > >> >
> > > > >> > Another option could be to expose "heartbeat" API to consumers.
> If
> > > my
> > > > >> app
> > > > >> > is still processing data but is still alive, it could initiate a
> > > > >> heartbeat
> > > > >> > to signal its alive without having to handle additional
> messages.
> > > > >> >
> > > > >> > I don't know if this improves more than it complicates though :(
> > > > >> >
> > > > >> > On Mon, Jan 4, 2016 at 11:40 AM, Jason Gustafson <
> > > ja...@confluent.io>
> > > > >> > wrote:
> > > > >> >
> > > > >> > > Hey Gwen,
> > > > >> > >
> > > > >> > > I was thinking along the lines of TCP window scaling in order
> to
> > > > >> > > dynamically find a good consumption rate. Basically you'd
> start
> > > off
> > > > >> > > consuming say 100 records and you'd let it increase until the
> > > > >> consumption
> > > > >> > > took longer than half the session timeout (for example). You
> > > /might/
> > > > >> be
> > > > >> > > able to achieve the same thing using pause/resume, but it
> would
> > > be a
> > > > >> lot
> > > > >> > > trickier since you have to do it at the granularity of
> > partitions.
> > > > But
> > > > >> > > yeah, database write performance doesn't always scale in a
> > > > predictable
> > > > >> > > enough way to accommodate this, so I'm not sure how useful it
> > > would
> > > > >> be in
> > > > >> > > practice. It might also be more difficult to implement since
> it
> > > > >> wouldn't
> > > > >> > be
> > > > >> > > as clear when to initiate the next fetch. With a static
> setting,
> > > the
> > > > >> > > consumer knows exactly how many records will be returned on
> the
> > > next
> > > > >> call
> > > > >> > > to poll() and can send fetches accordingly.
> > > > >> > >
> > > > >> > > On the other hand, I do feel a little wary of the need to tune
> > the
> > > > >> > session
> > > > >> > > timeout and max messages though since these settings might
> > depend
> > > on
> > > > >> the
> > > > >> > > environment that the consumer is deployed in. It wouldn't be a
> > big
> > > > >> deal
> > > > >> > if
> > > > >> > > the impact was relatively minor, but getting them wrong can
> > cause
> > > a
> > > > >> lot
> > > > >> > of
> > > > >> > > rebalance churn which could keep the consumer from making any
> > > > >> progress.
> > > > >> > > It's not a particularly graceful failure.
> > > > >> > >
> > > > >> > > -Jason
> > > > >> > >
> > > > >> > > On Mon, Jan 4, 2016 at 10:49 AM, Gwen Shapira <
> > g...@confluent.io>
> > > > >> wrote:
> > > > >> > >
> > > > >> > > > I can't speak to all use-cases, but for the database one, I
> > > think
> > > > >> > > > pause-resume will be necessary in any case, and therefore
> > > dynamic
> > > > >> batch
> > > > >> > > > sizes are not needed.
> > > > >> > > >
> > > > >> > > > Databases are really unexpected regarding response times -
> > load
> > > > and
> > > > >> > > locking
> > > > >> > > > can affect this. I'm not sure there's a good way to know you
> > are
> > > > >> going
> > > > >> > > into
> > > > >> > > > rebalance hell before it is too late. So if I were writing
> > code
> > > > that
> > > > >> > > > updates an RDBMS based on Kafka, I'd pick a reasonable batch
> > > size
> > > > >> (say
> > > > >> > > 5000
> > > > >> > > > records), and basically pause, batch-insert all records,
> > commit
> > > > and
> > > > >> > > resume.
> > > > >> > > >
> > > > >> > > > Does that make sense?
> > > > >> > > >
> > > > >> > > > On Mon, Jan 4, 2016 at 10:37 AM, Jason Gustafson <
> > > > >> ja...@confluent.io>
> > > > >> > > > wrote:
> > > > >> > > >
> > > > >> > > > > Gwen and Ismael,
> > > > >> > > > >
> > > > >> > > > > I agree the configuration option is probably the way to
> go,
> > > but
> > > > I
> > > > >> was
> > > > >> > > > > wondering whether there would be cases where it made sense
> > to
> > > > let
> > > > >> the
> > > > >> > > > > consumer dynamically set max messages to adjust for
> > downstream
> > > > >> > > slowness.
> > > > >> > > > > For example, if the consumer is writing consumed records
> to
> > > > >> another
> > > > >> > > > > database, and that database is experiencing heavier than
> > > > expected
> > > > >> > load,
> > > > >> > > > > then the consumer could halve its current max messages in
> > > order
> > > > to
> > > > >> > > adapt
> > > > >> > > > > without risking rebalance hell. It could then increase max
> > > > >> messages
> > > > >> > as
> > > > >> > > > the
> > > > >> > > > > load on the database decreases. It's basically an easier
> way
> > > to
> > > > >> > handle
> > > > >> > > > flow
> > > > >> > > > > control than we provide with pause/resume.
> > > > >> > > > >
> > > > >> > > > > -Jason
> > > > >> > > > >
> > > > >> > > > > On Mon, Jan 4, 2016 at 9:46 AM, Gwen Shapira <
> > > g...@confluent.io
> > > > >
> > > > >> > > wrote:
> > > > >> > > > >
> > > > >> > > > > > The wiki you pointed to is no longer maintained and fell
> > out
> > > > of
> > > > >> > sync
> > > > >> > > > with
> > > > >> > > > > > the code and protocol.
> > > > >> > > > > >
> > > > >> > > > > > You may want  to refer to:
> > > > >> > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > > > >> > > > > >
> > > > >> > > > > > On Mon, Jan 4, 2016 at 4:38 AM, Jens Rantil <
> > > > >> jens.ran...@tink.se>
> > > > >> > > > wrote:
> > > > >> > > > > >
> > > > >> > > > > > > Hi guys,
> > > > >> > > > > > >
> > > > >> > > > > > > I realized I never thanked yall for your input -
> thanks!
> > > > >> > > > > > > Jason: I apologize for assuming your stance on the
> > issue!
> > > > >> Feels
> > > > >> > > like
> > > > >> > > > we
> > > > >> > > > > > all
> > > > >> > > > > > > agreed on the solution. +1
> > > > >> > > > > > >
> > > > >> > > > > > > Follow-up: Jason made a point about defining prefetch
> > and
> > > > >> > fairness
> > > > >> > > > > > > behaviour in the KIP. I am now working on putting that
> > > down
> > > > in
> > > > >> > > > writing.
> > > > >> > > > > > To
> > > > >> > > > > > > do be able to do this I think I need to understand the
> > > > current
> > > > >> > > > prefetch
> > > > >> > > > > > > behaviour in the new consumer API (0.9) a bit better.
> > Some
> > > > >> > specific
> > > > >> > > > > > > questions:
> > > > >> > > > > > >
> > > > >> > > > > > >    - How does a specific consumer balance incoming
> > > messages
> > > > >> from
> > > > >> > > > > multiple
> > > > >> > > > > > >    partitions? Is the consumer simply issuing
> > Multi-Fetch
> > > > >> > > requests[1]
> > > > >> > > > > for
> > > > >> > > > > > > the
> > > > >> > > > > > >    consumed assigned partitions of the relevant
> topics?
> > Or
> > > > is
> > > > >> the
> > > > >> > > > > > consumer
> > > > >> > > > > > >    fetching from one partition at a time and balancing
> > > > between
> > > > >> > them
> > > > >> > > > > > >    internally? That is, is the responsibility of
> > partition
> > > > >> > > balancing
> > > > >> > > > > (and
> > > > >> > > > > > >    fairness) on the broker side or consumer side?
> > > > >> > > > > > >    - Is the above documented somewhere?
> > > > >> > > > > > >
> > > > >> > > > > > > [1]
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Writing+a+Driver+for+Kafka
> > > > >> > > > > > > ,
> > > > >> > > > > > > see "Multi-Fetch".
> > > > >> > > > > > >
> > > > >> > > > > > > Thanks,
> > > > >> > > > > > > Jens
> > > > >> > > > > > >
> > > > >> > > > > > > On Wed, Dec 23, 2015 at 2:44 AM, Ismael Juma <
> > > > >> ism...@juma.me.uk>
> > > > >> > > > > wrote:
> > > > >> > > > > > >
> > > > >> > > > > > > > On Wed, Dec 23, 2015 at 1:24 AM, Gwen Shapira <
> > > > >> > g...@confluent.io
> > > > >> > > >
> > > > >> > > > > > wrote:
> > > > >> > > > > > > >
> > > > >> > > > > > > > > Given the background, it sounds like you'll
> > generally
> > > > want
> > > > >> > each
> > > > >> > > > > call
> > > > >> > > > > > to
> > > > >> > > > > > > > > poll() to return the same number of events (which
> is
> > > the
> > > > >> > number
> > > > >> > > > you
> > > > >> > > > > > > > planned
> > > > >> > > > > > > > > on having enough memory / time for). It also
> sounds
> > > like
> > > > >> > tuning
> > > > >> > > > the
> > > > >> > > > > > > > number
> > > > >> > > > > > > > > of events will be closely tied to tuning the
> session
> > > > >> timeout.
> > > > >> > > > That
> > > > >> > > > > > is -
> > > > >> > > > > > > > if
> > > > >> > > > > > > > > I choose to lower the session timeout for some
> > > reason, I
> > > > >> will
> > > > >> > > > have
> > > > >> > > > > to
> > > > >> > > > > > > > > modify the number of records returning too.
> > > > >> > > > > > > > >
> > > > >> > > > > > > > > If those assumptions are correct, I think a
> > > > configuration
> > > > >> > makes
> > > > >> > > > > more
> > > > >> > > > > > > > sense.
> > > > >> > > > > > > > > 1. We are unlikely to want this parameter to be
> > change
> > > > at
> > > > >> the
> > > > >> > > > > > lifetime
> > > > >> > > > > > > of
> > > > >> > > > > > > > > the consumer
> > > > >> > > > > > > > > 2. The correct value is tied to another
> > configuration
> > > > >> > > parameter,
> > > > >> > > > so
> > > > >> > > > > > > they
> > > > >> > > > > > > > > will be controlled together.
> > > > >> > > > > > > > >
> > > > >> > > > > > > >
> > > > >> > > > > > > > I was thinking the same thing.
> > > > >> > > > > > > >
> > > > >> > > > > > > > Ismael
> > > > >> > > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > >
> > > > >> > > > > > > --
> > > > >> > > > > > > Jens Rantil
> > > > >> > > > > > > Backend engineer
> > > > >> > > > > > > Tink AB
> > > > >> > > > > > >
> > > > >> > > > > > > Email: jens.ran...@tink.se
> > > > >> > > > > > > Phone: +46 708 84 18 32
> > > > >> > > > > > > Web: www.tink.se
> > > > >> > > > > > >
> > > > >> > > > > > > Facebook <https://www.facebook.com/#!/tink.se>
> Linkedin
> > > > >> > > > > > > <
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> http://www.linkedin.com/company/2735919?trk=vsrp_companies_res_photo&trkInfo=VSRPsearchId%3A1057023381369207406670%2CVSRPtargetId%3A2735919%2CVSRPcmpt%3Aprimary
> > > > >> > > > > > > >
> > > > >> > > > > > >  Twitter <https://twitter.com/tink>
> > > > >> > > > > > >
> > > > >> > > > > >
> > > > >> > > > >
> > > > >> > > >
> > > > >> > >
> > > > >> >
> > > > >>
> > > > >>
> > > > >>
> > > > >> --
> > > > >> Cliff Rhyne
> > > > >> Software Engineering Lead
> > > > >> e: crh...@signal.co
> > > > >> signal.co
> > > > >> ________________________
> > > > >>
> > > > >> Cut Through the Noise
> > > > >>
> > > > >> This e-mail and any files transmitted with it are for the sole use
> > of
> > > > the
> > > > >> intended recipient(s) and may contain confidential and privileged
> > > > >> information. Any unauthorized use of this email is strictly
> > > prohibited.
> > > > >> ©2015 Signal. All rights reserved.
> > > > >>
> > > > >
> > > > >
> > > >
> > >
> >
>



-- 
-- Guozhang

Reply via email to