> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
> > This looks good so far. I think it's much easier to understand when all the 
> > blocking stuff happens at the KafkaConsumer level and each of the classes 
> > it uses only ever handles single requests. It'd be nice to document the 
> > basic architecture somewhere since it took me a bit to fully figure it out. 
> > (Unfortunately, since the javadocs for the consumer are in the 
> > implementation class KafkaConsumer instead of on the Consumer interface, we 
> > can't put this with the KafkaConsumer class...)
> > 
> > Some notes in addition to the inline stuff:
> > 
> > Some functionality has been pulled back up to KafkaConsumer, in a mild 
> > reversal of Guozhang's refactoring. It'd be nice to keep this to a minimum. 
> > The ones that stuck out to me were 
> > resetOffsets()/resetOffset()/offsetBefore(). I'm guessing you also couldn't 
> > figure out a way to keep it in Fetcher since the inner call to 
> > offsetBefore() requires that blocking loop?
> > 
> > Some handling of DelayedResponses and its subclasses seem redundant/follows 
> > a common pattern and maybe could be refactored into utility code. However, 
> > there are few enough places it's happening now that I don't think it's a 
> > big deal. It does seem a bit wasteful that we have to continually create 
> > these DelayedResponse objects even in cases where we know we'll fail fast, 
> > but I suppose those cases should be unusual and the cost to allocate them 
> > isn't all that high.
> > 
> > Finally, a readability/cleanliness thing. This patch adds more nested 
> > anonymous RequestCompletionHandler classes. I think these are fine as they 
> > are, but if the implementations get too long or branchy with all the 
> > various error conditions they can become unreadably over-indented. Taking 
> > some of the big ones and using named nested classes might help improve 
> > clarity, although it does separate the request initiating code from the 
> > response handling code.

Yeah, that's right. The offsetBefore method was the tricky one to deal with. I 
tried to keep the lower level logic in Fetcher, but resetOffsets ended up 
leaking into KafkaConsumer. That was before I started using the DelayedResponse 
interface though, so there may be a way to move it back to Fetcher.

We could definitely reduce some of the DelayedResponse object creation using 
several static instances. I think there are only a couple cases where it is 
actually necessary to have a new instance. I also think the usage is currently 
a little nasty since you have to make sure that the delayed response is 
finished through all code paths. That could 

As you mention, it's kind of nice having the handler code right there in the 
method. I actually kind of like the pattern used in some other cases where 
instead of a nested class, an anonymous class is created which simply delegates 
to a handler method.


> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 683
> > <https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line683>
> >
> >     I think this is simpler and clearer if you reorder these two -- reset 
> > the offsets that need it first, then update fetch positions for ones that 
> > are missing it. I think this removes the extra conditional 
> > !subscriptions.offsetResetNeeded(tp) that was added in updateFetchPositions 
> > too.

Yeah, I agree. I'll have to rework the code a little bit since 
updateFetchPositions can lead to an offset reset (in the case that the fetched 
and committed positions are both null), but I can probably handle that case in 
a separate method.


> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 696
> > <https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line696>
> >
> >     Any reason not to do this? Definitely seems like it'd be necessary 
> > although the logic might be more complicated than just moving 
> > maybeHearbeat() in here -- some of the other lead up to this could change 
> > if you saw a partition reassignment in the middle of a long poll, requiring 
> > fetch positions to be updated, etc.

No particular reason. I just hadn't thought it all the way through yet. Perhaps 
to make it easy, we could just exit the poll call if we need a partition 
reassignment or an offset update?


> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 988
> > <https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line988>
> >
> >     Besides grouping by node, we could also send these requests out in 
> > parallel. The drawback to simplifying this all to use a series off 
> > offsetBefore() calls is that each blocks, so resetting a bunch of offsets 
> > is going to be pretty slow.
> >     
> >     Obvious solution is a utility that lets you run a bunch of requests in 
> > parallel, then do the same looping you're doing waiting for a single 
> > response but handle a bunch all at once.

This was a limitation of the original code, which I have preserved for 
simplicity. After this restructuring, it's starting to look like a 
straightforward optimization.


> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1098
> > <https://reviews.apache.org/r/34789/diff/3/?file=976965#file976965line1098>
> >
> >     In async mode, why bother polling at all here? You mentioned 
> > coordinator connection issues in the comment below, but if we need to 
> > handle that here, aren't we effectively making this a synchronous request? 
> > Or semi-synchronous since it has to wait for the coordinator?
> >     
> >     Or is this best left to a follow up since there were other issues with 
> > async since we need callbacks and there were other ordering issues in the 
> > implementation?

This is actually consistent with the old implementation which always blocked 
while ensuring that the coordinator was ready. The trouble is that the commit 
is lost when the coordinator is unavailable (even if it just hasn't been 
connected to). This is where your delayed task queue might be handy. We can 
reschedule the commit for a later time when the coordinator is healthy again.


> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java,
> >  line 15
> > <https://reviews.apache.org/r/34789/diff/3/?file=976967#file976967line15>
> >
> >     The classes named XResponse may be a bit confusing because the protocol 
> > responses use that terminology. Future? Result?

Agreed. In fact, they were XResult initially. I changed them because 
BrokerResult and CoordinatorResult didn't seems to suggest as clearly what they 
were for as BrokerResponse and CoordinatorResponse. I considered Future as 
well, but its usage is a bit different than traditional Java Futures. Perhaps 
XReply?


> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java,
> >  line 16
> > <https://reviews.apache.org/r/34789/diff/3/?file=976967#file976967line16>
> >
> >     Also, probably worth adding some javadocs to explain the how these 
> > classes work -- similar to future, but one potential request may or may not 
> > be made, and the first issue it encounters causes the 
> > Delayed/Broker/CoordinatorResponse to become ready with some sort of 
> > result. That result could be one of a small set of issues for 
> > Broker/CoordinatorResponses or is the value obtained from a valid response 
> > from the broker.

I will do so.


> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
> >  line 70
> > <https://reviews.apache.org/r/34789/diff/3/?file=976968#file976968line70>
> >
> >     You can get rid of this -- metadata is no longer used in this class. 
> > Nice work, that should help make the layering of these classes cleaner.

Good catch. I had the feeling that it was becoming unnecessary.


> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
> >  line 134
> > <https://reviews.apache.org/r/34789/diff/3/?file=976968#file976968line134>
> >
> >     Seems like blockingCoordinatorRequest/initiateCoordinatorRequest logic 
> > had to be expanded. This same basic code seems to be duplicated in multiple 
> > places now. Any way to avoid the duplication?
> >     
> >     I see that there's some code now in the middle of those blocks that 
> > differ (the code that used to come after the blocking request finished), 
> > but these are fairly large chunks of code.

I think we can bring back initiateCoordinatorRequest in a reduced form (maybe 
just createCoordinatorRequest).


> On June 3, 2015, 6:44 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java,
> >  line 196
> > <https://reviews.apache.org/r/34789/diff/3/?file=976972#file976972line196>
> >
> >     This naming threw me off when I was looking at callers, at first I 
> > thought offsetResetNeeded() was checking the value and 
> > offsetResetNeeded(TopicPartition) was setting a flag that the 
> > TopicPartition needed offsets reset. Maybe rename these to 
> > isOffsetResetNeeded to clarify, leaving needOffsetReset() to set the flags?

Yeah, I agree. I was trying to stick with the convention that was already in 
use, but it breaks down in this case. I'll rename as you suggest.


- Jason


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review86338
-----------------------------------------------------------


On June 3, 2015, 12:10 a.m., Jason Gustafson wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> -----------------------------------------------------------
> 
> (Updated June 3, 2015, 12:10 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
>     https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  b2764df11afa7a99fce46d1ff48960d889032d14 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
> 
> Diff: https://reviews.apache.org/r/34789/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>

Reply via email to