> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 995
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line995>
> >
> >     Should we pass in tp to isOffsetResetNeeded()?

Yes, we should. I'll fix it.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 797-798
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797>
> >
> >     Hmm, seekToBegining() is supposed to be a blocking call. Basically, at 
> > the end of the call, we expect the fetch offset to be set to the beginning. 
> > This is now changed to async, which doesn't match the intended behavior. We 
> > need to think through if this matters or not.
> >     
> >     Ditto for seekToEnd().

Since we always update fetch positions before a new fetch and in position(), it 
didn't seem necessary to make it synchronous. I thought this handling might be 
more consistent with how new subscriptions are handled (which are asynchronous 
and defer the initial offset fetch until the next poll or position). That being 
said, I don't have a strong feeling about it, so we could return to the 
blocking version.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 1039-1040
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1039>
> >
> >     The returned response may be ready already after the offsetBefore call 
> > due to needing metadata refresh. Since we don't check the ready state 
> > immediately afterward, we may be delaying the processing of metadata 
> > refresh by the request timeout.

This is a pretty good point. One of the reasons working with NetworkClient is 
tricky is that you need several polls to complete a request: one to connect, 
one to send, and one to receive. In this case, the result might not be ready 
because we are in the middle of connecting to the broker, in which case we need 
to call poll() to finish the connect. If we don't, then then next request will 
just fail for the same reason. I'll look to see if there's a way to fix this to 
avoid unnecessary calls to poll.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 1139-1141
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1139>
> >
> >     In the async mode, response may not be ready in the first iteration. 
> > Are we handling the retry properly in that case?

When an async commit request fails, we do not retry, which is consistent with 
the current consumer. I think Ewen's patch for KAFKA-2123 introduces a good 
approach to retrying async commits. My own preference is to let them fail fast 
as long as the user has the callback from KAFKA-2123 to handle their failure. 
Otherwise, it gets a little tricky trying to preserve their order.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1195
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1195>
> >
> >     We probably need to make some changes here when KAFKA-2120 is done to 
> > handle the request timeout propertly. Perhaps we can add a TODO comment 
> > here.

I'll add a note. This also falls in the purview of KAFKA-1894.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java,
> >  line 16
> > <https://reviews.apache.org/r/34789/diff/8/?file=980077#file980077line16>
> >
> >     Do we need NONE?

It was there before, but I don't think it's actually used. I'd be fine removing 
it.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java,
> >  lines 20-27
> > <https://reviews.apache.org/r/34789/diff/8/?file=980080#file980080line20>
> >
> >     Perhaps we can define a static method to initalize the constant and set 
> > the state. It's clearer that way since the instantiation and the 
> > initialization are in the same place. With this, we probably don't need the 
> > static getter methods and can just let the caller use the static constants 
> > directly.
> >     
> >     Ditto for BrokerResult.

Agreed. I'll fix it.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1212
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212>
> >
> >     -1 makes the pollClient block forever. So, we don't get a chance to do 
> > the wakeup check.

I might be wrong, but I think we can still use NetworkClient.wakeup to 
interrupt a poll call which is waiting forever.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 1078-1080
> > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1078>
> >
> >     Currently, our coding convention is not to wrap single line statement 
> > with {}. There are a few other cases like this.

I'll do a pass and try to clean these up.


> On June 9, 2015, 7:58 p.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java,
> >  line 196
> > <https://reviews.apache.org/r/34789/diff/8/?file=980084#file980084line196>
> >
> >     To be consistent with the naming convention with the rest of the 
> > methods, should we just name it offsetRestNeeded()?

Haha, I actually used that convention initially, but it was a little confusing 
at times which method should be used. I can change it back, or we can add the 
"is" prefix to the other usages. Preferences?


- Jason


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34789/#review87190
-----------------------------------------------------------


On June 5, 2015, 7:45 p.m., Jason Gustafson wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34789/
> -----------------------------------------------------------
> 
> (Updated June 5, 2015, 7:45 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2168
>     https://issues.apache.org/jira/browse/KAFKA-2168
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2168; refactored callback handling to prevent unnecessary requests
> 
> 
> KAFKA-2168; address review comments
> 
> 
> KAFKA-2168; fix rebase error and checkstyle issue
> 
> 
> KAFKA-2168; address review comments and add docs
> 
> 
> KAFKA-2168; handle polling with timeout 0
> 
> 
> KAFKA-2168; timeout=0 means return immediately
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> 8f587bc0705b65b3ef37c86e0c25bb43ab8803de 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> 1ca75f83d3667f7d01da1ae2fd9488fb79562364 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> d1d1ec178f60dc47d408f52a89e52886c1a093a2 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> f50da825756938c193d7f07bee953e000e2627d9 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1496a0851526f3c7d3905ce4bdff2129c83a6c1 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  56281ee15cc33dfc96ff64d5b1e596047c7132a4 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  cee75410127dd1b86c1156563003216d93a086b3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> 677edd385f35d4262342b567262c0b874876d25b 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  419541011d652becf0cda7a5e62ce813cddb1732 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ecc78cedf59a994fcf084fa7a458fe9ed5386b00 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 
> 
> Diff: https://reviews.apache.org/r/34789/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>

Reply via email to