Hi Jason,

Thanks for this very useful KIP.  In general I am with Guozhang on the
purpose of of the three timeouts.
1) session timeout for consumer liveness,
2) process timeout (or maybe we should rename it to max.poll.interval.ms)
for application liveness,
3) rebalance timeout for faster rebalance in some failure cases.

It seems the current discussion is mainly about whether we need 3) as a
separate timeout or not. The current KIP proposal is to combine 2) and 3),
i.e. just use process timeout as rebalance timeout. That means we need to
either increase rebalance timeout out to let it adapt to process timeout,
or the reverse. It would be helpful to understand the impact of these two
cases. Here are my two cents.

For users who are consuming data from Kafka, usually they either care about
throughput or care about latency.

If users care about the latency, they would probably care more about
average latency instead of 99.99 percentile latency which can be affected
by many other more common reasons other than consumer failure. Because all
the timeout we are discussing here only have impact on the 99.99 percentile
latency, I don't think it would really make a difference for latency
sensitive users.

The majority of the use cases for Kafka Connect and Mirror Maker are
throughput sensitive. Ewen raised a good example where Kafka Connect needs
to process the previous data on rebalance therefore requires a higher
rebalance timeout than process timeout. This is essentially the same in
Mirror Maker, where each rebalance needs to flush all the messages in the
accumulator in the producer. That could take some time depending on how
many messages are there. In this case, we may need to increase the process
timeout to make it the same as rebalance timeout. But this is probably
fine. The downside of increasing process timeout is a longer detection time
of a consumer failure.  Detecting a consumer failure a little later only
has limited impact because the rest of the consumers in the same group are
still working fine. So the total throughput is unlikely to drop
significantly. As long as the rebalance is not taking longer it should be
fine. The reason we care more about how fast rebalance can finish is
because during rebalance no consumer in the group is consuming, i.e.
throughput is zero. So we want to make the rebalance finish as quickly as
possible.

Compare with increasing process timeout to rebalance timeout, it seems a
more common case where user wants a longer process timeout, but smaller
rebalance timeout. I am more worried about this case where we have to
shoehorn the rebalance timeout into process timeout. For users care about
throughput, that might cause the rebalance to take unnecessarily longer.
Admittedly this only has impact when a consumer had problem during
rebalance, but depending on how long the process timeout was set, the
rebalance could potentially take forever like Guozhang mentioned.

I agree with Guozhang that we can start with 1) and 2) and add 3) later if
needed. But adding rebalance timeout is more involved than just adding a
configuration. That also means the rebalance has to be done in the
background heartbeat thread. Hence we have to synchronize rebalance and
consumer.poll() like we did in old consumer. Otherwise user may lose
messages if auto commit is enabled, or the manual commit might fail after a
consumer.poll() because the partitions might have been reassigned. So
having a separate rebalance timeout also potentially means a big change to
the users as well.

Thanks,

Jiangjie (Becket) Qin



On Fri, Jun 3, 2016 at 11:45 AM, Jason Gustafson <ja...@confluent.io> wrote:

> Hey Ewen,
>
> I confess your comments caught me off guard. It never occurred to me that
> anyone would ask for a rebalance timeout so that it could be set _larger_
> than the process timeout. Even with buffered or batch processing, I would
> usually expect flushing before a rebalance to take no more time than a
> periodic flush. Otherwise, I'd probably try to see if there was some
> workload I could push into periodic flushes so that rebalances could
> complete faster. But supposing this isn't possible or practical in some
> cases, I'm wondering how limiting it would be in practice to have only the
> one timeout in this case? I'm a little reluctant to add the additional
> timeout since I think most users would not have a strong need to keep a
> tight bound on normal processing time. (I'm also reminded that Jay
> mentioned he might have to dock everyone's pay 5% for each new timeout we
> introduce ;-)
>
> Thanks,
> Jason
>
>
>
>
> On Thu, Jun 2, 2016 at 7:30 PM, Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Hi Ewen,
> >
> > I think you are right, the rebalance process could potentially involve
> all
> > the delayed compute / IO. More specifically, this is what I think of the
> > rebalance process:
> >
> > 1. Coordinator decides to rebalance, start ticking based on rebalance
> time
> > out.
> > 2. Consumer realize rebalance needed when calling poll(); trigger
> > onPartitionRevoked().
> > 3. Consumer sends JoinGroupRequest;
> > 4. Coordinator send JoinGroupResponse; start ticking on the leader.
> > 5. Leader compute and send SyncGroupRequest
> > 6. Coordinator send SyncGroupResponse; start ticking on session timeout.
> > 7. Consumer get new assignment; trigger onPartitionAssigned().
> >
> > In the above process: delayed compute / IO is usually done at step 2);
> > workload initialization is usually done in step 7); and some admin work
> > (like in Kafka Streams) are likely to be done in step 5). As in the
> current
> > KIP proposal the rebalance timeout on the coordinator start ticking on 1)
> > on everyone in the group, and stop ticking on 3); it start ticking on
> > leader again on step 4), and stop upon step 5). In this case the delayed
> > compute / IO contained in step 2) is covered by this rebalance timeout.
> >
> > That being said, I think for "worst case", the time of processing a
> single
> > record would still be similar to rebalancing, since both of which could
> > result in completing all delayed compute / IO so far. And since
> "processing
> > timeout" is used to cover the worst case, it should be still OK?
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Thu, Jun 2, 2016 at 5:55 PM, Ewen Cheslack-Postava <e...@confluent.io
> >
> > wrote:
> >
> > > Jason,
> > >
> > > I've been thinking about this more in terms of something like Connect.
> I
> > > think the rebalance timeout may be a bit different from the process
> > > timeout, and even the process timeout is a bit of a misnomer.
> > >
> > > We sort of talk about the process timeout as if it can be an indicator
> of
> > > maximum processing time for a record/batch. This makes sense for a case
> > of
> > > a data-dependent load (i.e. you can only load some data from slow
> storage
> > > after seeing some data) where that load might be very large compared to
> > > normal processing time. It also makes sense if you have auto commit
> > enabled
> > > because you need to be completely finished processing the data before
> > > calling poll() again, so that time before you call another consumer API
> > > actually reflects processing time.
> > >
> > > It might makes less sense in cases like streams (or any other app) that
> > > batch writes to disk, or connectors that "process" a message by
> enqueuing
> > > the data, but won't commit offsets until data is flushed, possibly
> during
> > > some other, much later iteration of processing. In this case I think
> > > processing time and rebalance time could potentially differ
> > significantly.
> > > During normal processing, you can potentially pipeline quite a bit,
> > > buffering up changes, flushing as needed, but then only committing once
> > > flushing is complete. But rebalancing is different then -- you *must*
> > > finish flushing all the data or manually choose to discard the data
> > > (presumably by doing something like watching for the process timeout
> you
> > > set and bailing early, only committing the offsets for data you've
> > > flushed). If you have lots of data built up, the cost for rebalancing
> > could
> > > be a *lot* higher than the maximum time you would otherwise see between
> > > calls to consumer APIs to indicate processing progress.
> > >
> > > The thing that makes these cases different is that processing isn't
> > > actually tied to calls to the consumer API. You can queue up /
> pipeline /
> > > defer some of the work. (By the way, this is currently a limitation of
> > sink
> > > connectors that I'm not thrilled about -- offset commit requires a full
> > > flush, whereas some coordination with the sink connector to not
> require a
> > > full flush except on rebalances would be much nicer, albeit more
> > difficult
> > > for sink connectors to implement.)
> > >
> > > -Ewen
> > >
> > >
> > >
> > > On Thu, Jun 2, 2016 at 5:14 PM, Jason Gustafson <ja...@confluent.io>
> > > wrote:
> > >
> > > > Hey Guozhang,
> > > >
> > > > I'm actually not too concerned about the time spent in the rebalance
> > > > callback specifically. Both it and regular processing time in the
> poll
> > > loop
> > > > will delay the rebalance and keep joined consumers idle. However, if
> we
> > > > expose the rebalance timeout, then it would give users the option to
> > > > effective disable the process timeout while still keeping a maximum
> > bound
> > > > on the rebalance time. If the consumer cannot complete its processing
> > > fast
> > > > enough and rejoin, then it would be evicted. This provides something
> > like
> > > > (2) since the other consumers in the group would be able to complete
> > the
> > > > rebalance and resume work while the evicted consumer would have to
> > > rollback
> > > > progress. This is not too different from rebalancing in the
> background
> > > > which also typically would cause commit failure and rollback (though
> at
> > > > least the consumer stays in the group).
> > > >
> > > > Now that I'm thinking about it more, I'm not sure this would be a
> great
> > > > facility to depend on in practice. It might be OK if just one or two
> of
> > > the
> > > > consumers fall out of the group during the rebalance, but if half the
> > > group
> > > > is regularly getting evicted, it would be a problem. So even if we
> > expose
> > > > the rebalance timeout, the user is still going to have to set it with
> > > some
> > > > idea in mind about how long processing should take.
> > > >
> > > > Thanks,
> > > > Jason
> > > >
> > > > On Thu, Jun 2, 2016 at 2:46 PM, Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hi Jason,
> > > > >
> > > > > With the current usage pattern of:
> > > > >
> > > > > while(..) {
> > > > >
> > > > >   consumer.poll(/* where rebalance happens */)
> > > > >
> > > > >   // process messages
> > > > > }
> > > > >
> > > > > ----------
> > > > >
> > > > > And since rebalance is till on the caller thread, not the
> background
> > > > > thread, if coordinator decides to rebalance while user thread is
> > still
> > > on
> > > > > processing messages, there is no options but we are forced to go
> with
> > > 1)
> > > > > right? I think the your / Onur's point here, which I agree, is that
> > by
> > > > > reusing process timeout as rebalance timeout, if the rebalance
> > callback
> > > > > could take longer time than processing a batch, users need to set
> the
> > > > > timeout value to the higher of the two, hence the callback latency,
> > > which
> > > > > will make detection of processing stallness less effective, right?
> > > > >
> > > > > As I mentioned  in my previous email, I feel that this case of
> > > "callback
> > > > > function time taking loner than processing a batch" would not be
> > > frequent
> > > > > in practice, and the processing timeout would usually be a good
> > higher
> > > > > bound on the callback function latency. If that is true, I'd
> suggest
> > we
> > > > > keep the current proposal and not add a third timeout config for
> > > covering
> > > > > this case.
> > > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Thu, Jun 2, 2016 at 10:40 AM, Jason Gustafson <
> ja...@confluent.io
> > >
> > > > > wrote:
> > > > >
> > > > > > Hey Guozhang,
> > > > > >
> > > > > > I think the problem is that users may not want to sacrifice
> > rebalance
> > > > > > latency because of uncertainty around processing time. As soon
> as a
> > > > > > rebalance begins, there are basically two choices:
> > > > > >
> > > > > > 1. Block the rebalance until all consumers have finished their
> > > current
> > > > > > processing.
> > > > > > 2. Let all consumers rebalance and "rollback" any processing that
> > > could
> > > > > not
> > > > > > be committed before the rebalance completes.
> > > > > >
> > > > > > If you choose option (1), then you have an incentive to keep a
> > > > relatively
> > > > > > tight bound on process.timeout.ms in order to reduce the
> > worst-case
> > > > idle
> > > > > > time during a rebalance. But if you fail to set it high enough,
> > then
> > > > > you'll
> > > > > > get spurious rebalances during normal processing. I think Onur is
> > > > saying
> > > > > > that this still sort of sucks for users. On the other hand, if
> (2)
> > is
> > > > > > acceptable, then users will have more freedom to err on the high
> > side
> > > > > when
> > > > > > setting process.timeout.ms, or even disable it entirely. They
> will
> > > > have
> > > > > to
> > > > > > deal with rolling back any progress which cannot be committed
> after
> > > the
> > > > > > rebalance completes, but maybe this is less of a problem for some
> > > > users?
> > > > > >
> > > > > > Thanks,
> > > > > > Jason
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Wed, Jun 1, 2016 at 10:23 PM, Guozhang Wang <
> wangg...@gmail.com
> > >
> > > > > wrote:
> > > > > >
> > > > > > > Hi Onur, Jason:
> > > > > > >
> > > > > > > Here are some thoughts about reusing process timeout as
> > server-side
> > > > > > > rebalance timeout: First of all, my understanding is that
> > > > > > >
> > > > > > > 1) session timeout is for detecting consumer crash / hard
> > failures
> > > > (in
> > > > > > this
> > > > > > > case the heartbeat thread will be dead as well, hence
> coordinator
> > > > > > realized
> > > > > > > within session timeout value).
> > > > > > >
> > > > > > > 2) process timeout is for checking liveness of the user thread
> > that
> > > > > calls
> > > > > > > the consumer as well as does the processing: when no consumer
> > calls
> > > > are
> > > > > > > made within the process timeout, heartbeat thread stop working
> > and
> > > > > hence
> > > > > > it
> > > > > > > will be detected by coordinator.
> > > > > > >
> > > > > > > 3) a potential server-side rebalance timeout would be used to
> > > detect
> > > > > > > consumer liveness during the rebalance period, in which the
> user
> > > > thread
> > > > > > is
> > > > > > > tied with the "poll" call and also the callback function, to
> > > prevent
> > > > a
> > > > > > slow
> > > > > > > / stalled consumer in their rebalance callback to cause the
> > > rebalance
> > > > > > > taking forever.
> > > > > > >
> > > > > > > I think we generally have two cases in practice regarding 3)
> > above:
> > > > > user
> > > > > > > either does almost nothing and hence should never be stalled
> > > (unless
> > > > > > there
> > > > > > > is a long GC), or they do various external IOs for maintaining
> > > their
> > > > > own
> > > > > > > states, for example, which could be taking long or even cause
> the
> > > > > thread
> > > > > > to
> > > > > > > stall. We do not need to worry too much about the former case,
> > and
> > > as
> > > > > for
> > > > > > > latter case if the process timeout value should usually be a
> good
> > > > > higher
> > > > > > > bound on the rebalance latency.
> > > > > > >
> > > > > > > That being said, if we observe that there is indeed a common
> > usage
> > > > > where
> > > > > > 2)
> > > > > > > and 3) would require very different timeout values which
> > overwhelms
> > > > the
> > > > > > > complexity of three timeout values, we can consider adding a
> > third
> > > > one
> > > > > > > then: it is easier to add more configs later.
> > > > > > >
> > > > > > >
> > > > > > > What do you think?
> > > > > > >
> > > > > > > Guozhang
> > > > > > >
> > > > > > >
> > > > > > > On Tue, May 31, 2016 at 2:35 PM, Jason Gustafson <
> > > ja...@confluent.io
> > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hey Onur,
> > > > > > > >
> > > > > > > > Thanks for the detailed response. I think the problem of
> > > > controlling
> > > > > > > > rebalance times is the main (known) gap in the proposal as it
> > > > stands.
> > > > > > > >
> > > > > > > > This burden goes away if you loosen the liveness property by
> > > > having a
> > > > > > > > > required rebalance time and optional processing time where
> > > > > rebalance
> > > > > > > > > happens in the background thread as stated in the KIP.
> > > > > > > >
> > > > > > > >
> > > > > > > > Just to clarify, the current KIP only allows rebalances to
> > > complete
> > > > > in
> > > > > > > the
> > > > > > > > foreground. When I suggested above in reply to Grant was that
> > we
> > > > > could
> > > > > > > add
> > > > > > > > a separate rebalance timeout setting, the behavior I had in
> > mind
> > > > was
> > > > > to
> > > > > > > let
> > > > > > > > the consumer fall out of the group if the timeout is reached
> > > while
> > > > > the
> > > > > > > > consumer is still processing. I was specifically trying to
> > avoid
> > > > > moving
> > > > > > > the
> > > > > > > > rebalance to the background thread since this significantly
> > > > increases
> > > > > > the
> > > > > > > > complexity of the implementation. We'd also have to think
> about
> > > > > > > > compatibility a bit more. For example, what are the
> > implications
> > > of
> > > > > > > having
> > > > > > > > the rebalance listener execute in a separate thread?
> > > > > > > >
> > > > > > > > Putting that issue aside, I think we need to convince
> ourselves
> > > > that
> > > > > a
> > > > > > > > separate rebalance timeout is really necessary since every
> new
> > > > > timeout
> > > > > > > adds
> > > > > > > > some conceptual noise which all users will see. My thought in
> > > this
> > > > > KIP
> > > > > > > was
> > > > > > > > that users who didn't want the burden of tuning the process
> > > timeout
> > > > > > could
> > > > > > > > use a relatively large value without a major impact because
> > group
> > > > > > > > rebalances themselves will typically be infrequent. The main
> > > > concern
> > > > > is
> > > > > > > for
> > > > > > > > users who have highly variant processing times and want to
> > > ensure a
> > > > > > tight
> > > > > > > > bound on rebalance times (even if it means having to discard
> > some
> > > > > > > > processing that cannot be completed before the rebalance
> > > finishes).
> > > > > > These
> > > > > > > > users will be left trying to tune process.timeout.ms and
> > > > > > > max.poll.records,
> > > > > > > > which is basically the same position they are currently in.
> The
> > > > > problem
> > > > > > > is
> > > > > > > > I don't know how common this case is, so I'm not sure how it
> > > weighs
> > > > > > > against
> > > > > > > > the cost of having an additional timeout that needs to be
> > > > explained.
> > > > > We
> > > > > > > can
> > > > > > > > always add the rebalance timeout later, but if it will be
> tough
> > > to
> > > > > > remove
> > > > > > > > once it's there. All the same, I'm not that keen on another
> > > > iteration
> > > > > > of
> > > > > > > > this problem, so if we believe this use case is common
> enough,
> > > then
> > > > > > maybe
> > > > > > > > we should add it now.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Jason
> > > > > > > >
> > > > > > > >
> > > > > > > > On Sat, May 28, 2016 at 3:10 AM, Onur Karaman <
> > > > > > > > onurkaraman.apa...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Thanks for the KIP writeup, Jason.
> > > > > > > > >
> > > > > > > > > Before anything else, I just wanted to point out that it's
> > > worth
> > > > > > > > mentioning
> > > > > > > > > the "heartbeat.interval.ms" consumer config in the KIP for
> > > > > > > completeness.
> > > > > > > > > Today this config only starts to kick in if poll is called
> > > > > frequently
> > > > > > > > > enough. A separate heartbeat thread should make this config
> > > > behave
> > > > > > more
> > > > > > > > > like what people would expect: a separate thread sending
> > > > heartbeats
> > > > > > at
> > > > > > > > the
> > > > > > > > > configured interval.
> > > > > > > > >
> > > > > > > > > With this KIP, the relevant configs become:
> > > > > > > > > "max.poll.records" - already exists
> > > > > > > > > "session.timeout.ms" - already exists
> > > > > > > > > "heartbeat.interval.ms" - already exists
> > > > > > > > > "process.timeout.ms" - new
> > > > > > > > >
> > > > > > > > > After reading the KIP several times, I think it would be
> > > helpful
> > > > to
> > > > > > be
> > > > > > > > more
> > > > > > > > > explicit in the desired outcome. Is it trying to make
> faster
> > > > > > > > > best/average/worst case rebalance times? Is it trying to
> make
> > > the
> > > > > > > clients
> > > > > > > > > need less configuration tuning?
> > > > > > > > >
> > > > > > > > > Also it seems that brokers probably still want to enforce
> > > minimum
> > > > > and
> > > > > > > > > maximum rebalance timeouts just as with the minimum and
> > maximum
> > > > > > session
> > > > > > > > > timeouts so DelayedJoins don't stay in purgatory
> > indefinitely.
> > > So
> > > > > > we'd
> > > > > > > > add
> > > > > > > > > new "group.min.rebalance.timeout.ms" and "
> > > > > > > group.max.rebalance.timeout.ms
> > > > > > > > "
> > > > > > > > > broker configs which again might need to be brought up in
> the
> > > > KIP.
> > > > > > > Let's
> > > > > > > > > say we add these bounds. A side-effect of having
> broker-side
> > > > bounds
> > > > > > on
> > > > > > > > > rebalance timeouts in combination with Java clients that
> > makes
> > > > > > process
> > > > > > > > > timeouts the same as rebalance timeouts is that the broker
> > > > > > effectively
> > > > > > > > > dictates the max processing time allowed between poll
> calls.
> > > This
> > > > > > > gotcha
> > > > > > > > > exists right now with today's broker-side bounds on session
> > > > > timeouts.
> > > > > > > So
> > > > > > > > > I'm not really convinced that the proposal gets rid of this
> > > > > > > complication
> > > > > > > > > mentioned in the KIP.
> > > > > > > > >
> > > > > > > > > I think the main question to ask is: does the KIP actually
> > > make a
> > > > > > > > > difference?
> > > > > > > > >
> > > > > > > > > It looks like this KIP improves rebalance times
> specifically
> > > when
> > > > > the
> > > > > > > > > client currently has processing times large enough to force
> > > > larger
> > > > > > > > session
> > > > > > > > > timeouts and heartbeat intervals to not be honored.
> > Separating
> > > > > > session
> > > > > > > > > timeouts from processing time means clients can keep their
> "
> > > > > > > > > session.timeout.ms" low so the coordinator can quickly
> > detect
> > > > > > process
> > > > > > > > > failure, and honoring a low "heartbeat.interval.ms" on the
> > > > > separate
> > > > > > > > > heartbeat thread means clients will be quickly notified of
> > > group
> > > > > > > > membership
> > > > > > > > > and subscription changes - all without placing difficult
> > > > > expectations
> > > > > > > on
> > > > > > > > > processing time. But even so, rebalancing through the
> calling
> > > > > thread
> > > > > > > > means
> > > > > > > > > the slowest processing client in the group will still be
> the
> > > rate
> > > > > > > > limiting
> > > > > > > > > step when looking at rebalance times.
> > > > > > > > >
> > > > > > > > > From a usability perspective, the burden still seems like
> it
> > > will
> > > > > be
> > > > > > > > tuning
> > > > > > > > > the processing time to keep the "progress liveness" happy
> > > during
> > > > > > > > rebalances
> > > > > > > > > while still having reasonable upper bounds on rebalance
> > times.
> > > It
> > > > > > still
> > > > > > > > > looks like users have to do almost the exact same tricks as
> > > today
> > > > > > when
> > > > > > > > the
> > > > > > > > > group membership changes due slow processing times even
> > though
> > > > all
> > > > > > the
> > > > > > > > > consumers are alive and the topics haven't change:
> > > > > > > > > 1. Increase the rebalance timeout to give more time for
> > record
> > > > > > > processing
> > > > > > > > > (the difference compared to today is that we bump the
> > rebalance
> > > > > > timeout
> > > > > > > > > instead of session timeout).
> > > > > > > > > 2. Reduce the number of records handled on each iteration
> > with
> > > > > > > > > max.poll.records.
> > > > > > > > >
> > > > > > > > > This burden goes away if you loosen the liveness property
> by
> > > > > having a
> > > > > > > > > required rebalance time and optional processing time where
> > > > > rebalance
> > > > > > > > > happens in the background thread as stated in the KIP.
> > > > > > > > >
> > > > > > > > > On Thu, May 26, 2016 at 12:40 PM, Jason Gustafson <
> > > > > > ja...@confluent.io>
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hey Grant,
> > > > > > > > > >
> > > > > > > > > > Thanks for the feedback. I'm definitely open to including
> > > > > > heartbeat()
> > > > > > > > in
> > > > > > > > > > this KIP. One thing we should be clear about is what the
> > > > behavior
> > > > > > of
> > > > > > > > > > heartbeat() should be when the group begins rebalancing.
> I
> > > > think
> > > > > > > there
> > > > > > > > > are
> > > > > > > > > > basically two options:
> > > > > > > > > >
> > > > > > > > > > 1. heartbeat() simply keeps heartbeating even if the
> group
> > > has
> > > > > > > started
> > > > > > > > > > rebalancing.
> > > > > > > > > > 2. heartbeat() completes the rebalance itself.
> > > > > > > > > >
> > > > > > > > > > With the first option, when processing takes longer than
> > the
> > > > > > > rebalance
> > > > > > > > > > timeout, the member will fall out of the group which will
> > > cause
> > > > > an
> > > > > > > > offset
> > > > > > > > > > commit failure when it finally finishes. However, if
> > > processing
> > > > > > > > finishes
> > > > > > > > > > before the rebalance completes, then offsets can still be
> > > > > > committed.
> > > > > > > On
> > > > > > > > > the
> > > > > > > > > > other hand, if heartbeat() completes the rebalance
> itself,
> > > then
> > > > > > > you'll
> > > > > > > > > > definitely see the offset commit failure for any records
> > > being
> > > > > > > > processed.
> > > > > > > > > > So the first option is sort of biased toward processing
> > > > > completion
> > > > > > > > while
> > > > > > > > > > the latter is biased toward rebalance completion.
> > > > > > > > > >
> > > > > > > > > > I'm definitely not a fan of second option since it takes
> > away
> > > > the
> > > > > > > > choice
> > > > > > > > > to
> > > > > > > > > > finish processing before rejoining. However, I do see
> some
> > > > > benefit
> > > > > > in
> > > > > > > > the
> > > > > > > > > > first option if the user wants to keep rebalance time low
> > and
> > > > > > doesn't
> > > > > > > > > mind
> > > > > > > > > > being kicked out of the group if processing takes longer
> > > > during a
> > > > > > > > > > rebalance. This may be a reasonable tradeoff since
> consumer
> > > > > groups
> > > > > > > are
> > > > > > > > > > presumed to be stable most of the time. A better option
> in
> > > that
> > > > > > case
> > > > > > > > > might
> > > > > > > > > > be to expose the rebalance timeout to the user directly
> > since
> > > > it
> > > > > > > would
> > > > > > > > > > allow the user to use an essentially unbounded
> > > > > process.timeout.ms
> > > > > > > for
> > > > > > > > > > highly variant processing while still keeping rebalance
> > time
> > > > > > limited.
> > > > > > > > Of
> > > > > > > > > > course, it would be another timeout for the user to
> > > > understand...
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Jason
> > > > > > > > > >
> > > > > > > > > > On Thu, May 26, 2016 at 8:19 AM, Grant Henke <
> > > > > ghe...@cloudera.com>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Jason,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for writing up a proposal (and a thorough one)!
> > This
> > > > is
> > > > > > > > > something
> > > > > > > > > > > that I had been thinking about this week too as I have
> > run
> > > > into
> > > > > > it
> > > > > > > > more
> > > > > > > > > > > than a handful of times now.
> > > > > > > > > > >
> > > > > > > > > > > I like the idea of having a larger processing timeout,
> > that
> > > > > > timeout
> > > > > > > > in
> > > > > > > > > > > unison with max.poll.records should in many cases
> > provide a
> > > > > > > > reasonable
> > > > > > > > > > > assurance that the consumer will stay alive.
> > > > > > > > > > >
> > > > > > > > > > > In rejected alternatives "Add a separate API the user
> can
> > > > call
> > > > > to
> > > > > > > > > > indicate
> > > > > > > > > > > liveness" is listed. I think a heartbeat api could be
> > added
> > > > > along
> > > > > > > > with
> > > > > > > > > > > these new timeout configurations and used for
> "advanced"
> > > use
> > > > > > cases
> > > > > > > > > where
> > > > > > > > > > > the processing time could be highly variant and less
> > > > > > predictable. I
> > > > > > > > > > think a
> > > > > > > > > > > place where we might use the heartbeat api in Kafka is
> > > > > > MirrorMaker.
> > > > > > > > > > >
> > > > > > > > > > > Today, I have seen people trying to find ways to
> leverage
> > > the
> > > > > > > > existing
> > > > > > > > > > api
> > > > > > > > > > > to "force" heartbeats by:
> > > > > > > > > > >
> > > > > > > > > > > 1. Calling poll to get the batch of records to process
> > > > > > > > > > > 2. Call pause on all partitions
> > > > > > > > > > > 3. Process the record batch
> > > > > > > > > > > 3a. While processing periodically call poll (which is
> > > > > essentially
> > > > > > > > just
> > > > > > > > > > > heartbeat since it returns no records and is paused)
> > > > > > > > > > > 4. Commit offsets and un-pause
> > > > > > > > > > > 5. Repeat from 1
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Grant
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Wed, May 25, 2016 at 6:32 PM, Jason Gustafson <
> > > > > > > ja...@confluent.io
> > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi All,
> > > > > > > > > > > >
> > > > > > > > > > > > One of the persistent problems we see with the new
> > > consumer
> > > > > is
> > > > > > > the
> > > > > > > > > use
> > > > > > > > > > of
> > > > > > > > > > > > the session timeout in order to ensure progress.
> > Whenever
> > > > > there
> > > > > > > is
> > > > > > > > a
> > > > > > > > > > > delay
> > > > > > > > > > > > in message processing which exceeds the session
> > timeout,
> > > no
> > > > > > > > > heartbeats
> > > > > > > > > > > can
> > > > > > > > > > > > be sent and the consumer is removed from the group.
> We
> > > seem
> > > > > to
> > > > > > > hit
> > > > > > > > > this
> > > > > > > > > > > > problem everywhere the consumer is used (including
> > Kafka
> > > > > > Connect
> > > > > > > > and
> > > > > > > > > > > Kafka
> > > > > > > > > > > > Streams) and we don't always have a great solution.
> > I've
> > > > > > written
> > > > > > > a
> > > > > > > > > KIP
> > > > > > > > > > to
> > > > > > > > > > > > address this problem here:
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+thread
> > > > > > > > > > > > .
> > > > > > > > > > > > Have a look and let me know what you think.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Jason
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > --
> > > > > > > > > > > Grant Henke
> > > > > > > > > > > Software Engineer | Cloudera
> > > > > > > > > > > gr...@cloudera.com | twitter.com/gchenke |
> > > > > > > > linkedin.com/in/granthenke
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > --
> > > > > > > -- Guozhang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> > >
> > >
> > > --
> > > Thanks,
> > > Ewen
> > >
> >
> >
> >
> > --
> > -- Guozhang
> >
>

Reply via email to