adding the open source alias.  This email started off as a broader
discussion around the new consumer.  I was zooming into only the aspect of
poll() being the only mechanism for driving the heartbeats.

Yes the lag is the effect of the problem (not the problem).  Monitoring the
lag is important as it is the primary way to tell if the application is
wedged.  There might be other metrics which can possibly capture the same
essence. Yes the lag is at the consumer group level, but you can tell that
one of the consumers is messed up if one of the partitions in the
application start generating lag and others are good for e.g.

Monitoring aside, I think the main point of concern is that in the old
consumer most customers don't have to worry about unnecessary rebalances
and most of the things that they do in their app doesn't have an impact on
the session timeout..  (i.e. the only thing that causes rebalances is when
the GC is out of whack).    For the handful of customers who are impacted
by GC related rebalances, i would imagine that all of them would really
want us to make the system more resilient.    I agree that the GC problem
can't be solved easily in the java client, however it appears that now we
would be expecting the consuming applications to be even more careful with
ongoing tuning of the timeouts.  At LinkedIn, we have seen that most kafka
applications don't have much of a clue about configuring the timeouts and
just end up calling the Kafka team when their application sees rebalances.

The other side effect of poll driving the heartbeats is that we have to
make sure that people don't set a poll timeout that is larger than the
session timeout.   If we had a notion of implicit heartbeats then we could
also automatically make this work for consumers by sending hearbeats at the
appropriate interval even though the customers want to do a long poll.

We could surely work around this in LinkedIn if either we have the Pause()
api or an explicit HeartBeat() api on the consumer.

Would love to hear how other people think about this subject ?

Thanks
Kartik



On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <n...@confluent.io> wrote:

> Agree with the dilemma you are pointing out, which is that there are many
> ways the application's message processing could fail and we wouldn't be
> able to model all of those in the consumer's failure detection mechanism.
> So we should try to model as much of it as we can so the consumer's failure
> detection is meaningful.
>
>
>> Point being that the only absolute way to really detect that an app is
>> healthy is to monitor lag. If the lag increases then for sure something is
>> wrong.
>
>
> The lag is merely the effect of the problem, not the problem itself. Lag
> is also a consumer group level concept and the problem we have is being
> able to detect failures at the level of individual consumer instances.
>
> As you pointed out, a consumer that poll() is a stronger indicator of
> whether the consumer is alive or not. The dilemma then is who defines what
> a healthy poll() frequency is. No one else but the application owner can
> define what a "normal" processing latency is for their application. Now the
> question is what's the easiest way for the user to define this without
> having to tune and fine tune this too often. The heartbeat interval
> certainly does not have to be *exactly* 99tile of processing latency but
> could be in the ballpark + an error delta. The error delta is the
> application owner's acceptable risk threshold during which they would be ok
> if the application remains part of the group despite being dead. It is
> ultimately a tradeoff between operational ease and more accurate failure
> detection.
>
> With quotas the write latencies to kafka could range from a few
>> milliseconds all the way to a tens of seconds.
>
>
> This is actually no different from the GC problem. Most most of the times,
> the normal GC falls in the few ms range and there are many applications
> even at LinkedIn for which the max GC falls in the multiple seconds range.
> Note that it also can't be predicted, so has to be an observed value. One
> way or the other, you have to observe what this acceptable "max" is for
> your application and then set the appropriate timeouts.
>
> Since this is not something that can be automated, this is a config that
> the application owner has to set based on the expected behavior of their
> application. Not wanting to do that leads to ending up with bad consumption
> semantics where the application process continues to be part of a group
> owning partitions but not consuming since it has halted due to a problem.
> The fact that the design requires them to express that in poll() frequency
> or not doesn't change the fact that the application owner has to go through
> the process of measuring and then defining this "max".
>
> The reverse where they don't do this and the application remains in the
> group despite being dead is super confusing and frustrating too. So the due
> diligence up front is actually worth. And as long as the poll() latency and
> processing latency can be monitored, it should be easy to tell the reason
> for a rebalance, whether that is valid or not and how that should be tuned.
>
> As for the wrapper, KIP-28 is the wrapper in open source that will hide
> this complexity and I agree that LI is unblocked since you can do this in
> TrackerConsumer in the meantime.
>
> Thanks,
> Neha
>
> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam <
> kparamasi...@linkedin.com> wrote:
>
>> For commit(), I think it should hopefully be an easier discussion, so
>> maybe we can follow up when we meet up next.
>>
>> As far as the heartbeat is concerned, I think the points you discuss are
>> all very valid.
>>
>> GC pauses impacting the heartbeats is a real issue. However there are a
>> smaller percentage of memory hungry apps that get hit by it.
>>
>> The broader issue whereby even if the heartbeats are healthy, the app
>> might not be behaving correctly is also real.  If the app is calling poll()
>> then the probability that the app is healthy is surely higher.  But this
>> again isn't an absolute measure that the app is processing correctly.
>> In other cases the app might have even died in which case this discussion
>> is moot.
>>
>> Point being that the only absolute way to really detect that an app is
>> healthy is to monitor lag. If the lag increases then for sure something is
>> wrong.
>>
>> The proposal seems to be that the application needs to tune their session
>> timeout based on the 99tile of the time they take to process events after
>> every poll.   This turns out is a nontrivial thing to do for an application
>> todo. To start with when an application is new their data is going to be
>> based on tests that they have done on synthetic data.  This often times
>> doesn't represent what they will see in production.  Once the app is in
>> production their processing latencies will potentially vary over time.  It
>> is extremely unlikely that the application owner does a careful job of
>> monitoring the 99tile of latencies over time and readjust the settings.
>> Often times the latencies vary because of variance is other services that
>> are called by the consumer as part of processing the events.
>>
>> Case in point would be a simple app which reads events and writes to
>> Kafka.  With quotas the write latencies to kafka could range from a few
>> milliseconds all the way to a tens of seconds.  As the scale of processing
>> for an app increasing the app or that 'user' could now get quotaed.
>> Instead of slowing down gracefully unless the application owner has
>> carefully tuned the timeout, now we are looking at a potential outage where
>> the app could get hit by constant rebalances.
>>
>> If we expose the pause() Api then It is possible for us to take care of
>> this in the linkedin wrapper.  Whereby we would keep calling poll on a
>> separate thread periodically and enqueue the messages. When the queue is
>> full we would call pause().
>>
>> In essence we can work around it in LinkedIn, however I think it is
>> vastly better if we address this in the Api as every major customer will
>> eventually be pained by it.
>>
>> Kartik
>>
>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io> wrote:
>>
>> Hey guys,
>>
>> Happy to discuss. I agree there may be some rough edges and now is
>> definitely the time to clean them up.
>>
>> I'm pretty reluctant to change the threading model or undergo a big api
>> redesign at this point beyond the group management stuff we've discussed in
>> the context of Samza/copycat which is already a big effort.
>>
>> Overall I agree that we have done a poor job of documenting which apis
>> block and which don't and when people are surprised because we haven't
>> labeled something that will be unintuitive. But the overall style of
>> poll/select-based apis is quite common in programming going back to unix
>> select so I don't think it's beyond people if explained well (after all we
>> need to mix sync and async apis and if we don't say which is which any
>> scheme will be confusing).
>>
>> For what it's worth the experience with this api has actually been about
>> 1000x better than the issues people had around intuitiveness with the
>> high-level api. The crazy blocking iterator, impossible internal queue
>> sizing, baroque threading model, etc  have all caused endless amounts of
>> anger. Not to mention that that client effectively disqualifies about 50%
>> of the use cases people want to try to use it for (plus I regularly hear
>> people tell me they've heard not to use it at all for various reasons
>> ranging from data loss to lack of features). It's important to have that
>> context when people need to switch and they say "oh the old way was so
>> simple and the new way complex!" :-)
>>
>> Let me give some context related to your points, based on our previous
>> discussions:
>>
>> For commit, let's discuss, that is easy either way.
>>
>> The motivation for avoiding additional threading was two-fold. First this
>> client is really intended to be the lowest level client. There are many,
>> many possible higher level processing abstractions. One thing we found to
>> be a big problem with the high-level client was that it coupled things
>> everyone must have--failover, etc--with things that are different in each
>> use case like the appropriate threading model. If you do this you need to
>> also maintain a thread free low-level consumer api for people to get around
>> whatever you have done.
>>
>> The second reason was that the internal threading in the client became
>> quite complex. The answer with threading is always that "it won't be
>> complex this time", but it always is.
>>
>> For the heartbeat you correctly describe the downside to coupling
>> heartbeat with poll--the contract is that the application must regularly
>> consume to be considered an active consumer. This allows the possibility of
>> false positive failure detections. However it's important to understand the
>> downside of the alternative. If you do background polling a consumer is
>> considered active as long as it isn't shutdown. This leads to all kinds of
>> active consumers that aren't consuming because they have leaked or
>> otherwise stopped but are still claiming partitions and heart-beating. This
>> failure mode is actually far far worse. If you allow false positives the
>> user sees the frequent rebalances and knows they aren't consuming
>> frequently enough to be considered active but if you allows false negatives
>> you end up having weeks go by before someone notices that a partition has
>> been unconsumed the whole time at which point the data is gone. Plus of
>> course even if you do this you still have regular false positives anyway
>> from GC pauses (as now). We discussed this in some depth at the time and
>> decided that it is better to have the liveness notion tied to *actual*
>> consumption which is the actual definition of liveness.
>>
>> Cheers,
>>
>> -Jay
>>
>>
>>
>>
>>
>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <okara...@linkedin.com>
>> wrote:
>>
>>> Hi Confluent Team.
>>>
>>> There has recently been a lot of open source activity regarding the new
>>> KafkaConsumer:
>>>
>>> https://issues.apache.org/jira/browse/KAFKA-2123
>>>
>>> https://issues.apache.org/jira/browse/KAFKA-2350
>>>
>>> https://issues.apache.org/jira/browse/KAFKA-2359
>>>
>>>
>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E
>>>
>>> We’ve explained the KafkaConsumer API to the Databus, Samza, and some
>>> other teams and we got similar feedback.
>>>
>>> To summarize the feedback we received from other teams:
>>>
>>>    1.
>>>
>>>    The current behavior is not intuitive. For example,
>>>    KafkaConsumer.poll drives everything. The other methods like subscribe,
>>>    unsubscribe, seek, commit(async) don’t do anything without a
>>>    KafkaConsumer.poll call.
>>>
>>>
>>>
>>>    1.
>>>
>>>    The semantics of a commit() call should be consistent between sync
>>>    and async operations. Currently, sync commit is a blocking call which
>>>    actually sends out an OffsetCommitRequest and waits for the response upon
>>>    the user’s KafkaConsumer.commit call. However, the async commit is a
>>>    nonblocking call which just queues up the OffsetCommitRequest. The 
>>> request
>>>    itself is later sent out in the next poll. The teams we talked to found
>>>    this misleading.
>>>
>>>
>>>
>>>    1.
>>>
>>>    Heartbeats are dependent on user application behavior (i.e. user
>>>    applications calling poll). This can be a big problem as we don’t control
>>>    how different applications behave. For example, we might have an
>>>    application which reads from Kafka and writes to Espresso. If Espresso is
>>>    slow for whatever reason, then in rebalances could happen.
>>>
>>> Generally speaking, we feel that the current KafkaConsumer API design is
>>> more of a wrapping around the old simple consumer, i.e. in old consumer we
>>> ask users to deal with raw protocols and error handlings while in
>>> KafkaConsumer we do that for users. However, for old high level consumer
>>> users (which are the majority of users), the experience is a noticeable
>>> regression. The old high level consumer interface is simple and easy to use
>>> for end user, while KafkaConsumer requires users to be aware of many
>>> underlying details and is becoming prohibitive for users to adopt. This is
>>> hinted by the javadoc growing bigger and bigger.
>>>
>>> We think it's getting to the point where we should take a step back and
>>> look at the big picture.
>>>
>>> The current state of KafkaConsumer is that it's single-threaded. There's
>>> one big KafkaConsumer.poll called by the user which pretty much drives
>>> everything:
>>>
>>> - data fetches
>>>
>>> - heartbeats
>>>
>>> - join groups (new consumer joining a group, topic subscription changes,
>>> reacting to group rebalance)
>>>
>>> - async offset commits
>>>
>>> - executing callbacks
>>>
>>> Given that the selector's poll is being driven by the end user, this
>>> ends up making us educate users on NIO and the consequences of not calling
>>> KafkaConsumer.poll frequently enough:
>>>
>>> - Coordinator will mark the consumer dead
>>>
>>> - async commits won't send
>>>
>>> - callbacks won't fire
>>>
>>> More generally speaking, there are many surprises with the current
>>> KafkaConsumer implementation.
>>>
>>> Here's what we consider to be the goals of KafkaConsumer:
>>>
>>> - NIO
>>>
>>> - ability to commit, manipulate offsets, and consume messages
>>>
>>> - a way to subscribe to topics(auto group management) or
>>> partitions(explicit group management)
>>>
>>> - no surprises in the user experience
>>>
>>> The last point is the big one that we think we aren't hitting. We think
>>> the most important example is that there should be no requirement from the
>>> end user to consistently KafkaConsumer.poll in order for all of the above
>>> tasks to happen. We think it would be better to split those tasks into
>>> tasks that should not rely on KafkaConsumer.poll and tasks that should rely
>>> on KafkaConsumer.poll.
>>>
>>> Tasks that should not rely on KafkaConsumer.poll:
>>>
>>> - heartbeats
>>>
>>> - join groups
>>>
>>> - commits
>>>
>>> - executing callbacks
>>>
>>> Only data fetches should rely on KafkaConsumer.poll
>>>
>>> This would help reduce the amount of surprises to the end user.
>>>
>>> We’ve sketched out a proposal and we’ll send it out to you guys early
>>> next week. We’d like to meet up with you at LinkedIn on *July 31, 2015* so
>>> we can talk about it before proposing it to open source.
>>>
>>> Regards,
>>> LinkedIn Kafka Dev Team
>>>
>>
>>
>
>
> --
> Thanks,
> Neha
>

Reply via email to