I think this proposal matches pretty well with what user's intuitively
expect the implementation to be. At a glance, I don't see any problems with
doing the liveness detection in the background thread. It also has the
advantage that the frequency of heartbeats (which controls how long
rebalancing takes) can be kept orthogonal to the rate of consumption. One
concern is about the rebalance callback. It would have to be executed in
the background thread, right? That means the user might have to implement
their own synchronization, which might be tricky to do right.

-Jason


On Wed, Jul 29, 2015 at 1:20 PM, Neha Narkhede <n...@confluent.io> wrote:

> Works now. Thanks Becket!
>
> On Wed, Jul 29, 2015 at 1:19 PM, Jiangjie Qin <j...@linkedin.com> wrote:
>
>> Ah... My bad, forgot to change the URL link for pictures.
>> Thanks for the quick response, Neha. It should be fixed now, can you try
>> again?
>>
>> Jiangjie (Becket) Qin
>>
>> On Wed, Jul 29, 2015 at 1:10 PM, Neha Narkhede <n...@confluent.io> wrote:
>>
>>> Thanks Becket. Quick comment - there seem to be a bunch of images that
>>> the wiki refers to, but none loaded for me. Just making sure if its just me
>>> or can everyone not see the pictures?
>>>
>>> On Wed, Jul 29, 2015 at 12:00 PM, Jiangjie Qin <j...@linkedin.com>
>>> wrote:
>>>
>>>> I agree with Ewen that a single threaded model will be tricky to
>>>> implement the same conventional semantic of async or Future. We just
>>>> drafted the following wiki which explains our thoughts in LinkedIn on the
>>>> new consumer API and threading model.
>>>>
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/New+consumer+API+change+proposal
>>>>
>>>> We were trying to see:
>>>> 1. If we can use some kind of methodology to help us think about what
>>>> API we want to provide to user for different use cases.
>>>> 2. What is the pros and cons of current single threaded model. Is there
>>>> a way that we can maintain the benefits while solve the issues we are
>>>> facing now with single threaded model.
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Tue, Jul 28, 2015 at 10:28 PM, Ewen Cheslack-Postava <
>>>> e...@confluent.io> wrote:
>>>>
>>>>>
>>>>>
>>>>> On Tue, Jul 28, 2015 at 5:18 PM, Guozhang Wang <wangg...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I think Ewen has proposed these APIs for using callbacks along with
>>>>>> returning future in the commit calls, i.e. something similar to:
>>>>>>
>>>>>> public Future<void> commit(ConsumerCommitCallback callback);
>>>>>>
>>>>>> public Future<void> commit(Map<TopicPartition, Long> offsets,
>>>>>> ConsumerCommitCallback callback);
>>>>>>
>>>>>> At that time I was slightly intending not to include the Future
>>>>>> besides adding the callback mainly because of the implementation 
>>>>>> complexity
>>>>>> I feel it could introduce along with the retry settings after looking
>>>>>> through the code base. I would happy to change my mind if we could 
>>>>>> propose
>>>>>> a prototype implementation that is simple enough.
>>>>>>
>>>>>>
>>>>> One of the reasons that interface ended up being difficult (or maybe
>>>>> impossible) to make work reasonably is because the consumer was 
>>>>> thread-safe
>>>>> at the time. That made it impossible to know what should be done when
>>>>> Future.get() is called -- should the implementation call poll() itself, or
>>>>> would the fact that the user is calling get() imply that there's a
>>>>> background thread running the poll() loop and we just need to wait for it?
>>>>>
>>>>> The consumer is no longer thread safe, but I think the same problem
>>>>> remains because the expectation with Futures is that they are thread safe.
>>>>> Which means that even if the consumer isn't thread safe, I would expect to
>>>>> be able to hand that Future off to some other thread, have the second
>>>>> thread call get(), and then continue driving the poll loop in my thread
>>>>> (which in turn would eventually resolve the Future).
>>>>>
>>>>> I quite dislike the sync/async enum. While both operations commit
>>>>> offsets, their semantics are so different that overloading a single method
>>>>> with both is messy. That said, I don't think we should consider this an
>>>>> inconsistency wrt the new producer API's use of Future because the two 
>>>>> APIs
>>>>> have a much more fundamental difference that justifies it: they have
>>>>> completely different threading and execution models.
>>>>>
>>>>> I think a Future-based API only makes sense if you can guarantee the
>>>>> operations that Futures are waiting on will continue to make progress
>>>>> regardless of what the thread using the Future does. The producer API 
>>>>> makes
>>>>> that work by processing asynchronous requests in a background thread. The
>>>>> new consumer does not, and so it becomes difficult/impossible to implement
>>>>> the Future correctly. (Or, you have to make assumptions which break other
>>>>> use cases; if you want to support the simple use case of just making a
>>>>> commit() synchronous by calling get(), the Future has to call poll()
>>>>> internally; but if you do that, then if any user ever wants to add
>>>>> synchronization to the consumer via some external mechanism, then the
>>>>> implementation of the Future's get() method will not be subject to that
>>>>> synchronization and things will break).
>>>>>
>>>>> -Ewen
>>>>>
>>>>>
>>>>>> Guozhang
>>>>>>
>>>>>> On Tue, Jul 28, 2015 at 4:03 PM, Neha Narkhede <n...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Hey Adi,
>>>>>>>
>>>>>>> When we designed the initial version, the producer API was still
>>>>>>> changing. I thought about adding the Future and then just didn't get to 
>>>>>>> it.
>>>>>>> I agree that we should look into adding it for consistency.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Neha
>>>>>>>
>>>>>>> On Tue, Jul 28, 2015 at 1:51 PM, Aditya Auradkar <
>>>>>>> aaurad...@linkedin.com> wrote:
>>>>>>>
>>>>>>>> Great discussion everyone!
>>>>>>>>
>>>>>>>> One general comment on the sync/async API's on the new consumer. I
>>>>>>>> think the producer tackles sync vs async API's well. For API's that can
>>>>>>>> either be sync or async, can we simply return a future? That seems more
>>>>>>>> elegant for the API's that make sense either in both flavors. From the
>>>>>>>> users perspective, it is more consistent with the new producer. One 
>>>>>>>> easy
>>>>>>>> example is the commit call with the CommitType enum.. we can make that 
>>>>>>>> call
>>>>>>>> always async and users can block on the future if they want to make 
>>>>>>>> sure
>>>>>>>> their offsets are committed.
>>>>>>>>
>>>>>>>> Aditya
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Jul 27, 2015 at 2:06 PM, Onur Karaman <
>>>>>>>> okara...@linkedin.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the great responses, everyone!
>>>>>>>>>
>>>>>>>>> To expand a tiny bit on my initial post: while I did bring up old
>>>>>>>>> high level consumers, the teams we spoke to were actually not the 
>>>>>>>>> types of
>>>>>>>>> services that simply wanted an easy way to get ConsumerRecords. We 
>>>>>>>>> spoke to
>>>>>>>>> infrastructure teams that I would consider to be closer to the 
>>>>>>>>> "power-user"
>>>>>>>>> end of the spectrum and would want KafkaConsumer's level of 
>>>>>>>>> granularity.
>>>>>>>>> Some would use auto group management. Some would use explicit group
>>>>>>>>> management. All of them would turn off auto offset commits. Yes, the 
>>>>>>>>> Samza
>>>>>>>>> team had prior experience with the old SimpleConsumer, but this is the
>>>>>>>>> first kafka consumer being used by the Databus team. So I don't really
>>>>>>>>> think the feedback received was about the simpler times or wanting
>>>>>>>>> additional higher-level clients.
>>>>>>>>>
>>>>>>>>> - Onur
>>>>>>>>>
>>>>>>>>> On Mon, Jul 27, 2015 at 1:41 PM, Jason Gustafson <
>>>>>>>>> ja...@confluent.io> wrote:
>>>>>>>>>
>>>>>>>>>> I think if we recommend a longer session timeout, then we should
>>>>>>>>>> expose the heartbeat frequency in configuration since this generally
>>>>>>>>>> controls how long normal rebalances will take. I think it's currently
>>>>>>>>>> hard-coded to 3 heartbeats per session timeout. It could also be 
>>>>>>>>>> nice to
>>>>>>>>>> have an explicit LeaveGroup request to implement clean shutdown of a
>>>>>>>>>> consumer. Then the coordinator doesn't have to wait for the timeout 
>>>>>>>>>> to
>>>>>>>>>> reassign partitions.
>>>>>>>>>>
>>>>>>>>>> -Jason
>>>>>>>>>>
>>>>>>>>>> On Mon, Jul 27, 2015 at 1:25 PM, Jay Kreps <j...@confluent.io>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hey Kartik,
>>>>>>>>>>>
>>>>>>>>>>> Totally agree we don't want people tuning timeouts in the common
>>>>>>>>>>> case.
>>>>>>>>>>>
>>>>>>>>>>> However there are two ways to avoid this:
>>>>>>>>>>> 1. Default the timeout high
>>>>>>>>>>> 2. Put the heartbeat in a separate thread
>>>>>>>>>>>
>>>>>>>>>>> When we were doing the consumer design we discussed this
>>>>>>>>>>> tradeoff and I think the conclusion we came to was that defaulting 
>>>>>>>>>>> to a
>>>>>>>>>>> high timeout was actually better. This means it takes a little 
>>>>>>>>>>> longer to
>>>>>>>>>>> detect a failure, but usually that is not a big problem and people 
>>>>>>>>>>> who want
>>>>>>>>>>> faster failure detection can tune it down. This seemed better than 
>>>>>>>>>>> having
>>>>>>>>>>> the failure detection not really cover the consumption and just be a
>>>>>>>>>>> background ping. The two reasons where (a) you still have the GC 
>>>>>>>>>>> problem
>>>>>>>>>>> even for the background thread, (b) consumption is in some sense a 
>>>>>>>>>>> better
>>>>>>>>>>> definition of an active healthy consumer and a lot of problems crop 
>>>>>>>>>>> up when
>>>>>>>>>>> you have an inactive consumer with an active background thread (as 
>>>>>>>>>>> today).
>>>>>>>>>>>
>>>>>>>>>>> When we had the discussion I think what we realized was that
>>>>>>>>>>> most people who were worried about the timeout where imagining a 
>>>>>>>>>>> very low
>>>>>>>>>>> default (500ms) say. But in fact just setting this to 60 seconds or 
>>>>>>>>>>> higher
>>>>>>>>>>> as a default would be okay, this adds to the failure detection time 
>>>>>>>>>>> but
>>>>>>>>>>> only apps that care about this need to tune. This should largely 
>>>>>>>>>>> eliminate
>>>>>>>>>>> false positives since after all if you disappear for 60 seconds that
>>>>>>>>>>> actually starts to be more of a true positive, even if you come 
>>>>>>>>>>> back... :-)
>>>>>>>>>>>
>>>>>>>>>>> -Jay
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Jul 27, 2015 at 1:05 PM, Kartik Paramasivam <
>>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> adding the open source alias.  This email started off as a
>>>>>>>>>>>> broader discussion around the new consumer.  I was zooming into 
>>>>>>>>>>>> only the
>>>>>>>>>>>> aspect of poll() being the only mechanism for driving the 
>>>>>>>>>>>> heartbeats.
>>>>>>>>>>>>
>>>>>>>>>>>> Yes the lag is the effect of the problem (not the problem).
>>>>>>>>>>>> Monitoring the lag is important as it is the primary way to tell 
>>>>>>>>>>>> if the
>>>>>>>>>>>> application is wedged.  There might be other metrics which can 
>>>>>>>>>>>> possibly
>>>>>>>>>>>> capture the same essence. Yes the lag is at the consumer group 
>>>>>>>>>>>> level, but
>>>>>>>>>>>> you can tell that one of the consumers is messed up if one of the
>>>>>>>>>>>> partitions in the application start generating lag and others are 
>>>>>>>>>>>> good for
>>>>>>>>>>>> e.g.
>>>>>>>>>>>>
>>>>>>>>>>>> Monitoring aside, I think the main point of concern is that in
>>>>>>>>>>>> the old consumer most customers don't have to worry about 
>>>>>>>>>>>> unnecessary
>>>>>>>>>>>> rebalances and most of the things that they do in their app 
>>>>>>>>>>>> doesn't have an
>>>>>>>>>>>> impact on the session timeout..  (i.e. the only thing that causes
>>>>>>>>>>>> rebalances is when the GC is out of whack).    For the handful of 
>>>>>>>>>>>> customers
>>>>>>>>>>>> who are impacted by GC related rebalances, i would imagine that 
>>>>>>>>>>>> all of them
>>>>>>>>>>>> would really want us to make the system more resilient.    I agree 
>>>>>>>>>>>> that the
>>>>>>>>>>>> GC problem can't be solved easily in the java client, however it 
>>>>>>>>>>>> appears
>>>>>>>>>>>> that now we would be expecting the consuming applications to be 
>>>>>>>>>>>> even more
>>>>>>>>>>>> careful with ongoing tuning of the timeouts.  At LinkedIn, we have 
>>>>>>>>>>>> seen
>>>>>>>>>>>> that most kafka applications don't have much of a clue about 
>>>>>>>>>>>> configuring
>>>>>>>>>>>> the timeouts and just end up calling the Kafka team when their 
>>>>>>>>>>>> application
>>>>>>>>>>>> sees rebalances.
>>>>>>>>>>>>
>>>>>>>>>>>> The other side effect of poll driving the heartbeats is that we
>>>>>>>>>>>> have to make sure that people don't set a poll timeout that is 
>>>>>>>>>>>> larger than
>>>>>>>>>>>> the session timeout.   If we had a notion of implicit heartbeats 
>>>>>>>>>>>> then we
>>>>>>>>>>>> could also automatically make this work for consumers by sending 
>>>>>>>>>>>> hearbeats
>>>>>>>>>>>> at the appropriate interval even though the customers want to do a 
>>>>>>>>>>>> long
>>>>>>>>>>>> poll.
>>>>>>>>>>>>
>>>>>>>>>>>> We could surely work around this in LinkedIn if either we have
>>>>>>>>>>>> the Pause() api or an explicit HeartBeat() api on the consumer.
>>>>>>>>>>>>
>>>>>>>>>>>> Would love to hear how other people think about this subject ?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Kartik
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Jul 25, 2015 at 7:41 PM, Neha Narkhede <
>>>>>>>>>>>> n...@confluent.io> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Agree with the dilemma you are pointing out, which is that
>>>>>>>>>>>>> there are many ways the application's message processing could 
>>>>>>>>>>>>> fail and we
>>>>>>>>>>>>> wouldn't be able to model all of those in the consumer's failure 
>>>>>>>>>>>>> detection
>>>>>>>>>>>>> mechanism. So we should try to model as much of it as we can so 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> consumer's failure detection is meaningful.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Point being that the only absolute way to really detect that
>>>>>>>>>>>>>> an app is healthy is to monitor lag. If the lag increases then 
>>>>>>>>>>>>>> for sure
>>>>>>>>>>>>>> something is wrong.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> The lag is merely the effect of the problem, not the problem
>>>>>>>>>>>>> itself. Lag is also a consumer group level concept and the 
>>>>>>>>>>>>> problem we have
>>>>>>>>>>>>> is being able to detect failures at the level of individual 
>>>>>>>>>>>>> consumer
>>>>>>>>>>>>> instances.
>>>>>>>>>>>>>
>>>>>>>>>>>>> As you pointed out, a consumer that poll() is a stronger
>>>>>>>>>>>>> indicator of whether the consumer is alive or not. The dilemma 
>>>>>>>>>>>>> then is who
>>>>>>>>>>>>> defines what a healthy poll() frequency is. No one else but the 
>>>>>>>>>>>>> application
>>>>>>>>>>>>> owner can define what a "normal" processing latency is for their
>>>>>>>>>>>>> application. Now the question is what's the easiest way for the 
>>>>>>>>>>>>> user to
>>>>>>>>>>>>> define this without having to tune and fine tune this too often. 
>>>>>>>>>>>>> The
>>>>>>>>>>>>> heartbeat interval certainly does not have to be *exactly*
>>>>>>>>>>>>> 99tile of processing latency but could be in the ballpark + an 
>>>>>>>>>>>>> error delta.
>>>>>>>>>>>>> The error delta is the application owner's acceptable risk 
>>>>>>>>>>>>> threshold during
>>>>>>>>>>>>> which they would be ok if the application remains part of the 
>>>>>>>>>>>>> group despite
>>>>>>>>>>>>> being dead. It is ultimately a tradeoff between operational ease 
>>>>>>>>>>>>> and more
>>>>>>>>>>>>> accurate failure detection.
>>>>>>>>>>>>>
>>>>>>>>>>>>> With quotas the write latencies to kafka could range from a
>>>>>>>>>>>>>> few milliseconds all the way to a tens of seconds.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is actually no different from the GC problem. Most most
>>>>>>>>>>>>> of the times, the normal GC falls in the few ms range and there 
>>>>>>>>>>>>> are many
>>>>>>>>>>>>> applications even at LinkedIn for which the max GC falls in the 
>>>>>>>>>>>>> multiple
>>>>>>>>>>>>> seconds range. Note that it also can't be predicted, so has to be 
>>>>>>>>>>>>> an
>>>>>>>>>>>>> observed value. One way or the other, you have to observe what 
>>>>>>>>>>>>> this
>>>>>>>>>>>>> acceptable "max" is for your application and then set the 
>>>>>>>>>>>>> appropriate
>>>>>>>>>>>>> timeouts.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Since this is not something that can be automated, this is a
>>>>>>>>>>>>> config that the application owner has to set based on the 
>>>>>>>>>>>>> expected behavior
>>>>>>>>>>>>> of their application. Not wanting to do that leads to ending up 
>>>>>>>>>>>>> with bad
>>>>>>>>>>>>> consumption semantics where the application process continues to 
>>>>>>>>>>>>> be part of
>>>>>>>>>>>>> a group owning partitions but not consuming since it has halted 
>>>>>>>>>>>>> due to a
>>>>>>>>>>>>> problem. The fact that the design requires them to express that 
>>>>>>>>>>>>> in poll()
>>>>>>>>>>>>> frequency or not doesn't change the fact that the application 
>>>>>>>>>>>>> owner has to
>>>>>>>>>>>>> go through the process of measuring and then defining this "max".
>>>>>>>>>>>>>
>>>>>>>>>>>>> The reverse where they don't do this and the application
>>>>>>>>>>>>> remains in the group despite being dead is super confusing and 
>>>>>>>>>>>>> frustrating
>>>>>>>>>>>>> too. So the due diligence up front is actually worth. And as long 
>>>>>>>>>>>>> as the
>>>>>>>>>>>>> poll() latency and processing latency can be monitored, it should 
>>>>>>>>>>>>> be easy
>>>>>>>>>>>>> to tell the reason for a rebalance, whether that is valid or not 
>>>>>>>>>>>>> and how
>>>>>>>>>>>>> that should be tuned.
>>>>>>>>>>>>>
>>>>>>>>>>>>> As for the wrapper, KIP-28 is the wrapper in open source that
>>>>>>>>>>>>> will hide this complexity and I agree that LI is unblocked since 
>>>>>>>>>>>>> you can do
>>>>>>>>>>>>> this in TrackerConsumer in the meantime.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Jul 25, 2015 at 4:30 PM, Kartik Paramasivam <
>>>>>>>>>>>>> kparamasi...@linkedin.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> For commit(), I think it should hopefully be an easier
>>>>>>>>>>>>>> discussion, so maybe we can follow up when we meet up next.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As far as the heartbeat is concerned, I think the points you
>>>>>>>>>>>>>> discuss are all very valid.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> GC pauses impacting the heartbeats is a real issue. However
>>>>>>>>>>>>>> there are a smaller percentage of memory hungry apps that get 
>>>>>>>>>>>>>> hit by it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The broader issue whereby even if the heartbeats are healthy,
>>>>>>>>>>>>>> the app might not be behaving correctly is also real.  If the 
>>>>>>>>>>>>>> app is
>>>>>>>>>>>>>> calling poll() then the probability that the app is healthy is 
>>>>>>>>>>>>>> surely
>>>>>>>>>>>>>> higher.  But this again isn't an absolute measure that the app is
>>>>>>>>>>>>>> processing correctly.
>>>>>>>>>>>>>> In other cases the app might have even died in which case
>>>>>>>>>>>>>> this discussion is moot.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Point being that the only absolute way to really detect that
>>>>>>>>>>>>>> an app is healthy is to monitor lag. If the lag increases then 
>>>>>>>>>>>>>> for sure
>>>>>>>>>>>>>> something is wrong.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The proposal seems to be that the application needs to tune
>>>>>>>>>>>>>> their session timeout based on the 99tile of the time they take 
>>>>>>>>>>>>>> to process
>>>>>>>>>>>>>> events after every poll.   This turns out is a nontrivial thing 
>>>>>>>>>>>>>> to do for
>>>>>>>>>>>>>> an application todo. To start with when an application is new 
>>>>>>>>>>>>>> their data is
>>>>>>>>>>>>>> going to be based on tests that they have done on synthetic 
>>>>>>>>>>>>>> data.  This
>>>>>>>>>>>>>> often times doesn't represent what they will see in production.  
>>>>>>>>>>>>>> Once the
>>>>>>>>>>>>>> app is in production their processing latencies will potentially 
>>>>>>>>>>>>>> vary over
>>>>>>>>>>>>>> time.  It is extremely unlikely that the application owner does 
>>>>>>>>>>>>>> a careful
>>>>>>>>>>>>>> job of monitoring the 99tile of latencies over time and readjust 
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> settings.  Often times the latencies vary because of variance is 
>>>>>>>>>>>>>> other
>>>>>>>>>>>>>> services that are called by the consumer as part of processing 
>>>>>>>>>>>>>> the events.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Case in point would be a simple app which reads events and
>>>>>>>>>>>>>> writes to Kafka.  With quotas the write latencies to kafka could 
>>>>>>>>>>>>>> range from
>>>>>>>>>>>>>> a few milliseconds all the way to a tens of seconds.  As the 
>>>>>>>>>>>>>> scale of
>>>>>>>>>>>>>> processing for an app increasing the app or that 'user' could 
>>>>>>>>>>>>>> now get
>>>>>>>>>>>>>> quotaed.  Instead of slowing down gracefully unless the 
>>>>>>>>>>>>>> application owner
>>>>>>>>>>>>>> has carefully tuned the timeout, now we are looking at a 
>>>>>>>>>>>>>> potential outage
>>>>>>>>>>>>>> where the app could get hit by constant rebalances.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If we expose the pause() Api then It is possible for us to
>>>>>>>>>>>>>> take care of this in the linkedin wrapper.  Whereby we would 
>>>>>>>>>>>>>> keep calling
>>>>>>>>>>>>>> poll on a separate thread periodically and enqueue the messages. 
>>>>>>>>>>>>>> When the
>>>>>>>>>>>>>> queue is full we would call pause().
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In essence we can work around it in LinkedIn, however I think
>>>>>>>>>>>>>> it is vastly better if we address this in the Api as every major 
>>>>>>>>>>>>>> customer
>>>>>>>>>>>>>> will eventually be pained by it.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Kartik
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Jul 24, 2015, at 10:08 PM, Jay Kreps <j...@confluent.io>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hey guys,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Happy to discuss. I agree there may be some rough edges and
>>>>>>>>>>>>>> now is definitely the time to clean them up.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm pretty reluctant to change the threading model or undergo
>>>>>>>>>>>>>> a big api redesign at this point beyond the group management 
>>>>>>>>>>>>>> stuff we've
>>>>>>>>>>>>>> discussed in the context of Samza/copycat which is already a big 
>>>>>>>>>>>>>> effort.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Overall I agree that we have done a poor job of documenting
>>>>>>>>>>>>>> which apis block and which don't and when people are surprised 
>>>>>>>>>>>>>> because we
>>>>>>>>>>>>>> haven't labeled something that will be unintuitive. But the 
>>>>>>>>>>>>>> overall style
>>>>>>>>>>>>>> of poll/select-based apis is quite common in programming going 
>>>>>>>>>>>>>> back to unix
>>>>>>>>>>>>>> select so I don't think it's beyond people if explained well 
>>>>>>>>>>>>>> (after all we
>>>>>>>>>>>>>> need to mix sync and async apis and if we don't say which is 
>>>>>>>>>>>>>> which any
>>>>>>>>>>>>>> scheme will be confusing).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For what it's worth the experience with this api has actually
>>>>>>>>>>>>>> been about 1000x better than the issues people had around 
>>>>>>>>>>>>>> intuitiveness
>>>>>>>>>>>>>> with the high-level api. The crazy blocking iterator, impossible 
>>>>>>>>>>>>>> internal
>>>>>>>>>>>>>> queue sizing, baroque threading model, etc  have all caused 
>>>>>>>>>>>>>> endless amounts
>>>>>>>>>>>>>> of anger. Not to mention that that client effectively 
>>>>>>>>>>>>>> disqualifies about
>>>>>>>>>>>>>> 50% of the use cases people want to try to use it for (plus I 
>>>>>>>>>>>>>> regularly
>>>>>>>>>>>>>> hear people tell me they've heard not to use it at all for 
>>>>>>>>>>>>>> various reasons
>>>>>>>>>>>>>> ranging from data loss to lack of features). It's important to 
>>>>>>>>>>>>>> have that
>>>>>>>>>>>>>> context when people need to switch and they say "oh the old way 
>>>>>>>>>>>>>> was so
>>>>>>>>>>>>>> simple and the new way complex!" :-)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let me give some context related to your points, based on our
>>>>>>>>>>>>>> previous discussions:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For commit, let's discuss, that is easy either way.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The motivation for avoiding additional threading was
>>>>>>>>>>>>>> two-fold. First this client is really intended to be the lowest 
>>>>>>>>>>>>>> level
>>>>>>>>>>>>>> client. There are many, many possible higher level processing 
>>>>>>>>>>>>>> abstractions.
>>>>>>>>>>>>>> One thing we found to be a big problem with the high-level 
>>>>>>>>>>>>>> client was that
>>>>>>>>>>>>>> it coupled things everyone must have--failover, etc--with things 
>>>>>>>>>>>>>> that are
>>>>>>>>>>>>>> different in each use case like the appropriate threading model. 
>>>>>>>>>>>>>> If you do
>>>>>>>>>>>>>> this you need to also maintain a thread free low-level consumer 
>>>>>>>>>>>>>> api for
>>>>>>>>>>>>>> people to get around whatever you have done.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The second reason was that the internal threading in the
>>>>>>>>>>>>>> client became quite complex. The answer with threading is always 
>>>>>>>>>>>>>> that "it
>>>>>>>>>>>>>> won't be complex this time", but it always is.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For the heartbeat you correctly describe the downside to
>>>>>>>>>>>>>> coupling heartbeat with poll--the contract is that the 
>>>>>>>>>>>>>> application must
>>>>>>>>>>>>>> regularly consume to be considered an active consumer. This 
>>>>>>>>>>>>>> allows the
>>>>>>>>>>>>>> possibility of false positive failure detections. However it's 
>>>>>>>>>>>>>> important to
>>>>>>>>>>>>>> understand the downside of the alternative. If you do background 
>>>>>>>>>>>>>> polling a
>>>>>>>>>>>>>> consumer is considered active as long as it isn't shutdown. This 
>>>>>>>>>>>>>> leads to
>>>>>>>>>>>>>> all kinds of active consumers that aren't consuming because they 
>>>>>>>>>>>>>> have
>>>>>>>>>>>>>> leaked or otherwise stopped but are still claiming partitions and
>>>>>>>>>>>>>> heart-beating. This failure mode is actually far far worse. If 
>>>>>>>>>>>>>> you allow
>>>>>>>>>>>>>> false positives the user sees the frequent rebalances and knows 
>>>>>>>>>>>>>> they aren't
>>>>>>>>>>>>>> consuming frequently enough to be considered active but if you 
>>>>>>>>>>>>>> allows false
>>>>>>>>>>>>>> negatives you end up having weeks go by before someone notices 
>>>>>>>>>>>>>> that a
>>>>>>>>>>>>>> partition has been unconsumed the whole time at which point the 
>>>>>>>>>>>>>> data is
>>>>>>>>>>>>>> gone. Plus of course even if you do this you still have regular 
>>>>>>>>>>>>>> false
>>>>>>>>>>>>>> positives anyway from GC pauses (as now). We discussed this in 
>>>>>>>>>>>>>> some depth
>>>>>>>>>>>>>> at the time and decided that it is better to have the liveness 
>>>>>>>>>>>>>> notion tied
>>>>>>>>>>>>>> to *actual* consumption which is the actual definition of
>>>>>>>>>>>>>> liveness.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -Jay
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Jul 24, 2015 at 5:35 PM, Onur Karaman <
>>>>>>>>>>>>>> okara...@linkedin.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Confluent Team.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> There has recently been a lot of open source activity
>>>>>>>>>>>>>>> regarding the new KafkaConsumer:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2123
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2350
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://issues.apache.org/jira/browse/KAFKA-2359
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> http://mail-archives.apache.org/mod_mbox/kafka-users/201507.mbox/%3ccaauywg_pwbs3hsevnp5rccmpvqbaamap+zgn8fh+woelvt_...@mail.gmail.com%3E
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We’ve explained the KafkaConsumer API to the Databus, Samza,
>>>>>>>>>>>>>>> and some other teams and we got similar feedback.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> To summarize the feedback we received from other teams:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    The current behavior is not intuitive. For example,
>>>>>>>>>>>>>>>    KafkaConsumer.poll drives everything. The other methods like 
>>>>>>>>>>>>>>> subscribe,
>>>>>>>>>>>>>>>    unsubscribe, seek, commit(async) don’t do anything without a
>>>>>>>>>>>>>>>    KafkaConsumer.poll call.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    The semantics of a commit() call should be consistent
>>>>>>>>>>>>>>>    between sync and async operations. Currently, sync commit is 
>>>>>>>>>>>>>>> a blocking
>>>>>>>>>>>>>>>    call which actually sends out an OffsetCommitRequest and 
>>>>>>>>>>>>>>> waits for the
>>>>>>>>>>>>>>>    response upon the user’s KafkaConsumer.commit call. However, 
>>>>>>>>>>>>>>> the async
>>>>>>>>>>>>>>>    commit is a nonblocking call which just queues up the 
>>>>>>>>>>>>>>> OffsetCommitRequest.
>>>>>>>>>>>>>>>    The request itself is later sent out in the next poll. The 
>>>>>>>>>>>>>>> teams we talked
>>>>>>>>>>>>>>>    to found this misleading.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    1.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    Heartbeats are dependent on user application behavior
>>>>>>>>>>>>>>>    (i.e. user applications calling poll). This can be a big 
>>>>>>>>>>>>>>> problem as we
>>>>>>>>>>>>>>>    don’t control how different applications behave. For 
>>>>>>>>>>>>>>> example, we might have
>>>>>>>>>>>>>>>    an application which reads from Kafka and writes to 
>>>>>>>>>>>>>>> Espresso. If Espresso
>>>>>>>>>>>>>>>    is slow for whatever reason, then in rebalances could happen.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Generally speaking, we feel that the current KafkaConsumer
>>>>>>>>>>>>>>> API design is more of a wrapping around the old simple 
>>>>>>>>>>>>>>> consumer, i.e. in
>>>>>>>>>>>>>>> old consumer we ask users to deal with raw protocols and error 
>>>>>>>>>>>>>>> handlings
>>>>>>>>>>>>>>> while in KafkaConsumer we do that for users. However, for old 
>>>>>>>>>>>>>>> high level
>>>>>>>>>>>>>>> consumer users (which are the majority of users), the 
>>>>>>>>>>>>>>> experience is a
>>>>>>>>>>>>>>> noticeable regression. The old high level consumer interface is 
>>>>>>>>>>>>>>> simple and
>>>>>>>>>>>>>>> easy to use for end user, while KafkaConsumer requires users to 
>>>>>>>>>>>>>>> be aware of
>>>>>>>>>>>>>>> many underlying details and is becoming prohibitive for users 
>>>>>>>>>>>>>>> to adopt.
>>>>>>>>>>>>>>> This is hinted by the javadoc growing bigger and bigger.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We think it's getting to the point where we should take a
>>>>>>>>>>>>>>> step back and look at the big picture.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The current state of KafkaConsumer is that it's
>>>>>>>>>>>>>>> single-threaded. There's one big KafkaConsumer.poll called by 
>>>>>>>>>>>>>>> the user
>>>>>>>>>>>>>>> which pretty much drives everything:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - data fetches
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - join groups (new consumer joining a group, topic
>>>>>>>>>>>>>>> subscription changes, reacting to group rebalance)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - async offset commits
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Given that the selector's poll is being driven by the end
>>>>>>>>>>>>>>> user, this ends up making us educate users on NIO and the 
>>>>>>>>>>>>>>> consequences of
>>>>>>>>>>>>>>> not calling KafkaConsumer.poll frequently enough:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - Coordinator will mark the consumer dead
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - async commits won't send
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - callbacks won't fire
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> More generally speaking, there are many surprises with the
>>>>>>>>>>>>>>> current KafkaConsumer implementation.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Here's what we consider to be the goals of KafkaConsumer:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - NIO
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - ability to commit, manipulate offsets, and consume messages
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - a way to subscribe to topics(auto group management) or
>>>>>>>>>>>>>>> partitions(explicit group management)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - no surprises in the user experience
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The last point is the big one that we think we aren't
>>>>>>>>>>>>>>> hitting. We think the most important example is that there 
>>>>>>>>>>>>>>> should be no
>>>>>>>>>>>>>>> requirement from the end user to consistently 
>>>>>>>>>>>>>>> KafkaConsumer.poll in order
>>>>>>>>>>>>>>> for all of the above tasks to happen. We think it would be 
>>>>>>>>>>>>>>> better to split
>>>>>>>>>>>>>>> those tasks into tasks that should not rely on 
>>>>>>>>>>>>>>> KafkaConsumer.poll and tasks
>>>>>>>>>>>>>>> that should rely on KafkaConsumer.poll.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Tasks that should not rely on KafkaConsumer.poll:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - heartbeats
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - join groups
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - commits
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> - executing callbacks
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Only data fetches should rely on KafkaConsumer.poll
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This would help reduce the amount of surprises to the end
>>>>>>>>>>>>>>> user.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> We’ve sketched out a proposal and we’ll send it out to you
>>>>>>>>>>>>>>> guys early next week. We’d like to meet up with you at LinkedIn 
>>>>>>>>>>>>>>> on *July
>>>>>>>>>>>>>>> 31, 2015* so we can talk about it before proposing it to
>>>>>>>>>>>>>>> open source.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>> LinkedIn Kafka Dev Team
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Neha
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Thanks,
>>>>>>> Neha
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> -- Guozhang
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Thanks,
>>>>> Ewen
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Thanks,
>>> Neha
>>>
>>
>>
>
>
> --
> Thanks,
> Neha
>

Reply via email to