----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36858/#review97710 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java (lines 361 - 366) <https://reviews.apache.org/r/36858/#comment153716> But those closed connections are only added to the disconnected list on the next selector.select() call right? So, you still have the issue that after a networkClient.poll() call, some socket connections are already cancelled but the connectionState is not reflecting that (need to wait for next poll() call). Also, it's a bit weird that the disconnect is initated in NetworkClient, but we have to push the info through selector and get it back. I was thinking that we can call handleTimedOutRequests() after handleConnections(). In that call, we first figure out the nodeIds that need to be closed. Then call selector.close() and for each such node, call the following code. connectionStates.disconnected(node); log.debug("Node {} disconnected.", node); for (ClientRequest request : this.inFlightRequests.clearAll(node)) { log.trace("Cancelled request {} due to node {} being disconnected", request, node); if (!metadataUpdater.maybeHandleDisconnection(request)) responses.add(new ClientResponse(request, now, true, null)); } The above code can be put in a private method and be reused in handleDisconnected(). Then, we can get rid of selector.disconect and replace existing usage with selector.close instead. - Jun Rao On Sept. 3, 2015, 10:12 p.m., Mayuresh Gharat wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/36858/ > ----------------------------------------------------------- > > (Updated Sept. 3, 2015, 10:12 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2120 > https://issues.apache.org/jira/browse/KAFKA-2120 > > > Repository: kafka > > > Description > ------- > > Solved compile error > > > Addressed Jason's comments for Kip-19 > > > Addressed Jun's comments > > > Addressed Jason's comments about the default values for requestTimeout > > > checkpoint > > > Addressed Joel's concerns. Also tried to include Jun's feedback. > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/ClientRequest.java > dc8f0f115bcda893c95d17c0a57be8d14518d034 > clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java > 7d24c6f5dd2b63b96584f3aa8922a1d048dc1ae4 > clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java > 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 > clients/src/main/java/org/apache/kafka/clients/KafkaClient.java > 7ab2503794ff3aab39df881bd9fbae6547827d3b > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > b31f7f1fbf93d29268b93811c9aad3e3c18e5312 > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > b9a2d4e2bc565f0ee72b27791afe5c894af262f1 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > 938981c23ec16dfaf81d1e647929a59e1572f40f > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java > 9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > 804d569498396d431880641041fc9292076452cb > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 06f00a99a73a288df9afa8c1d4abe3580fa968a6 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java > 4cb1e50d6c4ed55241aeaef1d3af09def5274103 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java > a152bd7697dca55609a9ec4cfe0a82c10595fbc3 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java > 06182db1c3a5da85648199b4c0c98b80ea7c6c0c > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > d2e64f7cd8bf56e433a210905b2874f71eee9ea0 > clients/src/main/java/org/apache/kafka/common/network/Selector.java > f49d54cbc1915ac686ff70ac657f08e4c96489c1 > clients/src/test/java/org/apache/kafka/clients/MockClient.java > 9133d85342b11ba2c9888d4d2804d181831e7a8e > clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java > 43238ceaad0322e39802b615bb805b895336a009 > > clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java > 2c693824fa53db1e38766b8c66a0ef42ef9d0f3a > > clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java > 5b2e4ffaeab7127648db608c179703b27b577414 > > clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java > aa44991777a855f4b7f4f7bf17107c69393ff8ff > clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java > df1205c935bee9a30a50816dbade64d6014b1ef2 > clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java > 3a684d98b05cadfb25c6f7f9a038ef1f6697edbf > core/src/main/scala/kafka/tools/ProducerPerformance.scala > 0335cc64013ffe2cdf1c4879e86e11ec8c526712 > core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala > 1198df02ddd7727269e84a751ba99520f6d5584a > core/src/test/scala/unit/kafka/utils/TestUtils.scala > 09b8444c2add87f0f70dbb182e892977a6b5c243 > > Diff: https://reviews.apache.org/r/36858/diff/ > > > Testing > ------- > > > Thanks, > > Mayuresh Gharat > >