-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review86338
-----------------------------------------------------------


This looks good so far. I think it's much easier to understand when all the 
blocking stuff happens at the KafkaConsumer level and each of the classes it 
uses only ever handles single requests. It'd be nice to document the basic 
architecture somewhere since it took me a bit to fully figure it out. 
(Unfortunately, since the javadocs for the consumer are in the implementation 
class KafkaConsumer instead of on the Consumer interface, we can't put this 
with the KafkaConsumer class...)

Some notes in addition to the inline stuff:

Some functionality has been pulled back up to KafkaConsumer, in a mild reversal 
of Guozhang's refactoring. It'd be nice to keep this to a minimum. The ones 
that stuck out to me were resetOffsets()/resetOffset()/offsetBefore(). I'm 
guessing you also couldn't figure out a way to keep it in Fetcher since the 
inner call to offsetBefore() requires that blocking loop?

Some handling of DelayedResponses and its subclasses seem redundant/follows a 
common pattern and maybe could be refactored into utility code. However, there 
are few enough places it's happening now that I don't think it's a big deal. It 
does seem a bit wasteful that we have to continually create these 
DelayedResponse objects even in cases where we know we'll fail fast, but I 
suppose those cases should be unusual and the cost to allocate them isn't all 
that high.

Finally, a readability/cleanliness thing. This patch adds more nested anonymous 
RequestCompletionHandler classes. I think these are fine as they are, but if 
the implementations get too long or branchy with all the various error 
conditions they can become unreadably over-indented. Taking some of the big 
ones and using named nested classes might help improve clarity, although it 
does separate the request initiating code from the response handling code.


clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment138296>

    This seems like it'll be analogous to KIP-19's request.timeout.ms since it 
looks like all the use cases are sending a single request and waiting for a 
single response. I don't think there's any patch ready for that yet (check w/ 
the JIRA and maybe Jiangjie has something that hasn't been submitted yet), but 
if that ends up accepted we could potentially add that flag in either patch.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment138299>

    Might want to move this to trace. Normal consumers are going to hit this *a 
lot*



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment138304>

    I think this is simpler and clearer if you reorder these two -- reset the 
offsets that need it first, then update fetch positions for ones that are 
missing it. I think this removes the extra conditional 
!subscriptions.offsetResetNeeded(tp) that was added in updateFetchPositions too.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment138305>

    Any reason not to do this? Definitely seems like it'd be necessary although 
the logic might be more complicated than just moving maybeHearbeat() in here -- 
some of the other lead up to this could change if you saw a partition 
reassignment in the middle of a long poll, requiring fetch positions to be 
updated, etc.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment138320>

    Besides grouping by node, we could also send these requests out in 
parallel. The drawback to simplifying this all to use a series off 
offsetBefore() calls is that each blocks, so resetting a bunch of offsets is 
going to be pretty slow.
    
    Obvious solution is a utility that lets you run a bunch of requests in 
parallel, then do the same looping you're doing waiting for a single response 
but handle a bunch all at once.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment138324>

    Since poll() can trigger auto offset commits, and then the commits can 
block while polling() for some time, can we end up recursing in some bad 
situations, e.g. if we consistently cannot get a coordinator?
    
    We might need to keep track if a commit is outstanding and not try to 
commit again, or just update the values we're trying to commit.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
<https://reviews.apache.org/r/34789/#comment138323>

    In async mode, why bother polling at all here? You mentioned coordinator 
connection issues in the comment below, but if we need to handle that here, 
aren't we effectively making this a synchronous request? Or semi-synchronous 
since it has to wait for the coordinator?
    
    Or is this best left to a follow up since there were other issues with 
async since we need callbacks and there were other ordering issues in the 
implementation?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java
<https://reviews.apache.org/r/34789/#comment138326>

    The classes named XResponse may be a bit confusing because the protocol 
responses use that terminology. Future? Result?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java
<https://reviews.apache.org/r/34789/#comment138327>

    Also, probably worth adding some javadocs to explain the how these classes 
work -- similar to future, but one potential request may or may not be made, 
and the first issue it encounters causes the Delayed/Broker/CoordinatorResponse 
to become ready with some sort of result. That result could be one of a small 
set of issues for Broker/CoordinatorResponses or is the value obtained from a 
valid response from the broker.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
<https://reviews.apache.org/r/34789/#comment138294>

    You can get rid of this -- metadata is no longer used in this class. Nice 
work, that should help make the layering of these classes cleaner.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
<https://reviews.apache.org/r/34789/#comment138331>

    Seems like blockingCoordinatorRequest/initiateCoordinatorRequest logic had 
to be expanded. This same basic code seems to be duplicated in multiple places 
now. Any way to avoid the duplication?
    
    I see that there's some code now in the middle of those blocks that differ 
(the code that used to come after the blocking request finished), but these are 
fairly large chunks of code.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
<https://reviews.apache.org/r/34789/#comment138333>

    The second half of this description no longer matches since there is no 
blocking commit here anymore.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
<https://reviews.apache.org/r/34789/#comment138337>

    time.milliseconds() is called 3x here, looks like you should be able to 
call once and use the same value.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
<https://reviews.apache.org/r/34789/#comment138339>

    If we need this enum across multiple classes, maybe it should be its own 
standalone type instead of nested in one of them.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
<https://reviews.apache.org/r/34789/#comment138301>

    This naming threw me off when I was looking at callers, at first I thought 
offsetResetNeeded() was checking the value and 
offsetResetNeeded(TopicPartition) was setting a flag that the TopicPartition 
needed offsets reset. Maybe rename these to isOffsetResetNeeded to clarify, 
leaving needOffsetReset() to set the flags?


- Ewen Cheslack-Postava


On June 3, 2015, 12:10 a.m., Jason Gustafson wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> -----------------------------------------------------------
> 
> (Updated June 3, 2015, 12:10 a.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
>     https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d301be4709f7b112e1f3a39f3c04cfa65f00fa60 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  b2764df11afa7a99fce46d1ff48960d889032d14 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResponse.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  ef9dd5238fbc771496029866ece1d85db6d7b7a5 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
> 
> Diff: https://reviews.apache.org/r/34789/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>

Reply via email to