> On June 9, 2015, 7:58 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 995 > > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line995> > > > > Should we pass in tp to isOffsetResetNeeded()?
Yes, we should. I'll fix it. > On June 9, 2015, 7:58 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > lines 797-798 > > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line797> > > > > Hmm, seekToBegining() is supposed to be a blocking call. Basically, at > > the end of the call, we expect the fetch offset to be set to the beginning. > > This is now changed to async, which doesn't match the intended behavior. We > > need to think through if this matters or not. > > > > Ditto for seekToEnd(). Since we always update fetch positions before a new fetch and in position(), it didn't seem necessary to make it synchronous. I thought this handling might be more consistent with how new subscriptions are handled (which are asynchronous and defer the initial offset fetch until the next poll or position). That being said, I don't have a strong feeling about it, so we could return to the blocking version. > On June 9, 2015, 7:58 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > lines 1039-1040 > > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1039> > > > > The returned response may be ready already after the offsetBefore call > > due to needing metadata refresh. Since we don't check the ready state > > immediately afterward, we may be delaying the processing of metadata > > refresh by the request timeout. This is a pretty good point. One of the reasons working with NetworkClient is tricky is that you need several polls to complete a request: one to connect, one to send, and one to receive. In this case, the result might not be ready because we are in the middle of connecting to the broker, in which case we need to call poll() to finish the connect. If we don't, then then next request will just fail for the same reason. I'll look to see if there's a way to fix this to avoid unnecessary calls to poll. > On June 9, 2015, 7:58 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > lines 1139-1141 > > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1139> > > > > In the async mode, response may not be ready in the first iteration. > > Are we handling the retry properly in that case? When an async commit request fails, we do not retry, which is consistent with the current consumer. I think Ewen's patch for KAFKA-2123 introduces a good approach to retrying async commits. My own preference is to let them fail fast as long as the user has the callback from KAFKA-2123 to handle their failure. Otherwise, it gets a little tricky trying to preserve their order. > On June 9, 2015, 7:58 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 1195 > > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1195> > > > > We probably need to make some changes here when KAFKA-2120 is done to > > handle the request timeout propertly. Perhaps we can add a TODO comment > > here. I'll add a note. This also falls in the purview of KAFKA-1894. > On June 9, 2015, 7:58 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java, > > line 16 > > <https://reviews.apache.org/r/34789/diff/8/?file=980077#file980077line16> > > > > Do we need NONE? It was there before, but I don't think it's actually used. I'd be fine removing it. > On June 9, 2015, 7:58 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java, > > lines 20-27 > > <https://reviews.apache.org/r/34789/diff/8/?file=980080#file980080line20> > > > > Perhaps we can define a static method to initalize the constant and set > > the state. It's clearer that way since the instantiation and the > > initialization are in the same place. With this, we probably don't need the > > static getter methods and can just let the caller use the static constants > > directly. > > > > Ditto for BrokerResult. Agreed. I'll fix it. > On June 9, 2015, 7:58 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 1212 > > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1212> > > > > -1 makes the pollClient block forever. So, we don't get a chance to do > > the wakeup check. I might be wrong, but I think we can still use NetworkClient.wakeup to interrupt a poll call which is waiting forever. > On June 9, 2015, 7:58 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > lines 1078-1080 > > <https://reviews.apache.org/r/34789/diff/8/?file=980075#file980075line1078> > > > > Currently, our coding convention is not to wrap single line statement > > with {}. There are a few other cases like this. I'll do a pass and try to clean these up. > On June 9, 2015, 7:58 p.m., Jun Rao wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java, > > line 196 > > <https://reviews.apache.org/r/34789/diff/8/?file=980084#file980084line196> > > > > To be consistent with the naming convention with the rest of the > > methods, should we just name it offsetRestNeeded()? Haha, I actually used that convention initially, but it was a little confusing at times which method should be used. I can change it back, or we can add the "is" prefix to the other usages. Preferences? - Jason ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34789/#review87190 ----------------------------------------------------------- On June 5, 2015, 7:45 p.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/34789/ > ----------------------------------------------------------- > > (Updated June 5, 2015, 7:45 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2168 > https://issues.apache.org/jira/browse/KAFKA-2168 > > > Repository: kafka > > > Description > ------- > > KAFKA-2168; refactored callback handling to prevent unnecessary requests > > > KAFKA-2168; address review comments > > > KAFKA-2168; fix rebase error and checkstyle issue > > > KAFKA-2168; address review comments and add docs > > > KAFKA-2168; handle polling with timeout 0 > > > KAFKA-2168; timeout=0 means return immediately > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > 8f587bc0705b65b3ef37c86e0c25bb43ab8803de > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > 1ca75f83d3667f7d01da1ae2fd9488fb79562364 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerWakeupException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > d1d1ec178f60dc47d408f52a89e52886c1a093a2 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > f50da825756938c193d7f07bee953e000e2627d9 > > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetResetStrategy.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/BrokerResult.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > c1496a0851526f3c7d3905ce4bdff2129c83a6c1 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/CoordinatorResult.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedResult.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > 56281ee15cc33dfc96ff64d5b1e596047c7132a4 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java > e7cfaaad296fa6e325026a5eee1aaf9b9c0fe1fe > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > cee75410127dd1b86c1156563003216d93a086b3 > > clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java > 677edd385f35d4262342b567262c0b874876d25b > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 419541011d652becf0cda7a5e62ce813cddb1732 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java > ecc78cedf59a994fcf084fa7a458fe9ed5386b00 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java > e000cf8e10ebfacd6c9ee68d7b88ff8c157f73c6 > > Diff: https://reviews.apache.org/r/34789/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >