-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36858/#review97710
-----------------------------------------------------------



clients/src/main/java/org/apache/kafka/clients/NetworkClient.java (lines 361 - 
366)
<https://reviews.apache.org/r/36858/#comment153716>

    But those closed connections are only added to the disconnected list on the 
next selector.select() call right? So, you still have the issue that after a 
networkClient.poll() call, some socket connections are already cancelled but 
the connectionState is not reflecting that (need to wait for next poll() call). 
Also, it's a bit weird that the disconnect is initated in NetworkClient, but we 
have to push the info through selector and get it back.
    
    I was thinking that we can call handleTimedOutRequests() after 
handleConnections(). In that call, we first figure out the nodeIds that need to 
be closed. Then call selector.close() and for each such node, call the 
following code.
    
                connectionStates.disconnected(node);
                log.debug("Node {} disconnected.", node);
                for (ClientRequest request : 
this.inFlightRequests.clearAll(node)) {
                    log.trace("Cancelled request {} due to node {} being 
disconnected", request, node);
                    if (!metadataUpdater.maybeHandleDisconnection(request))
                        responses.add(new ClientResponse(request, now, true, 
null));
                }
    
    The above code can be put in a private method and be reused in 
handleDisconnected().
    
    Then, we can get rid of selector.disconect and replace existing usage with 
selector.close instead.


- Jun Rao


On Sept. 3, 2015, 10:12 p.m., Mayuresh Gharat wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36858/
> -----------------------------------------------------------
> 
> (Updated Sept. 3, 2015, 10:12 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2120
>     https://issues.apache.org/jira/browse/KAFKA-2120
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Solved compile error
> 
> 
> Addressed Jason's comments for Kip-19
> 
> 
> Addressed Jun's comments
> 
> 
> Addressed Jason's comments about the default values for requestTimeout
> 
> 
> checkpoint
> 
> 
> Addressed Joel's concerns. Also tried to include Jun's feedback.
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> dc8f0f115bcda893c95d17c0a57be8d14518d034 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 7d24c6f5dd2b63b96584f3aa8922a1d048dc1ae4 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
> 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 7ab2503794ff3aab39df881bd9fbae6547827d3b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> b31f7f1fbf93d29268b93811c9aad3e3c18e5312 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> b9a2d4e2bc565f0ee72b27791afe5c894af262f1 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 938981c23ec16dfaf81d1e647929a59e1572f40f 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
>  9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 804d569498396d431880641041fc9292076452cb 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 06f00a99a73a288df9afa8c1d4abe3580fa968a6 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
>  4cb1e50d6c4ed55241aeaef1d3af09def5274103 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> d2e64f7cd8bf56e433a210905b2874f71eee9ea0 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> f49d54cbc1915ac686ff70ac657f08e4c96489c1 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
> 9133d85342b11ba2c9888d4d2804d181831e7a8e 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 43238ceaad0322e39802b615bb805b895336a009 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
>  2c693824fa53db1e38766b8c66a0ef42ef9d0f3a 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  5b2e4ffaeab7127648db608c179703b27b577414 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
>  aa44991777a855f4b7f4f7bf17107c69393ff8ff 
>   clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java 
> df1205c935bee9a30a50816dbade64d6014b1ef2 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> 3a684d98b05cadfb25c6f7f9a038ef1f6697edbf 
>   core/src/main/scala/kafka/tools/ProducerPerformance.scala 
> 0335cc64013ffe2cdf1c4879e86e11ec8c526712 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> 1198df02ddd7727269e84a751ba99520f6d5584a 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> 09b8444c2add87f0f70dbb182e892977a6b5c243 
> 
> Diff: https://reviews.apache.org/r/36858/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Mayuresh Gharat
> 
>

Reply via email to