> On Sept. 1, 2015, 1:02 a.m., Jun Rao wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, lines 
> > 362-367
> > <https://reviews.apache.org/r/36858/diff/5/?file=1038488#file1038488line362>
> >
> >     We need to think through this logic a bit more. The patch here 
> > disconnects the connection from the selector, but doesn't mark the 
> > connectionState as disconnected immediately. Only until the next 
> > networkClient.poll(), does the connectionState change to disconnected. The 
> > issue with this approach is that selector.disconnect actually cancels the 
> > socket key. So, at that moment, the connection is no longer usable. 
> > However, the connectionState is still connected. A client can check the 
> > connection as ready and then makes a send call. The send will then get a 
> > CancelledKeyException, which is weird.
> >     
> >     We are dealing with a similar issue in KAFKA-2411. Our current thinking 
> > is to have a networkClient.disconnect() that closes the socket key as well 
> > as removes the client from connectionState. This will make the state in 
> > networkClient consistent after each poll() call. If we have that, we can 
> > just call networkClient.disconnect() in handleTimedOutRequests() and handle 
> > those disconnected connections immediately. Then, we don't need to maintain 
> > the clientDisconnects state in Selector.
> 
> Mayuresh Gharat wrote:
>     Thanks a lot Jun.
>     I was thinking about something similar when I was rebasing the patch 
> yesterday with latest trunk. So the initial code that I wrote, was taking the 
> disconnected nodes and adding them to disconnected list in disconnect() in 
> Selector. But that imposes dpendency that handleTimeout() should be called 
> before handledisconnections(), because every poll clears the disconnected 
> list. I will test the approach you suggested and upload a patch for this.

Thinking more on this I feel that we can add the explicitly disconnected node 
in disconnect() function of Selector to the List<disconnected> in selector and 
call the handleTimeOut() before handleDisconnections() in NetworkClient. That 
will handle all the above requirements in the same poll(). The only issue with 
this is that as per the javadoc for disconnect() ("*The disconnection is 
asynchronous and will not be processed until the next {@link #poll(long) 
poll()} call.*")
The reason we need to have the explicitly disconnected node in to the selectors 
disconnected list is we need to have all the functionality in 
handleDisconnections() (metadataUpdate, clearing the inflight request for that 
node) and don't want to duplicate code.

The other way is maintain a list of disconnected node from handleTimeOut. In 
handleDisconnections iterate over the merged list of disconnected nodes from 
this list and the selectors disconnected list. This takes into consideration 
the approach that you mentioned above. What do you think?


- Mayuresh


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36858/#review97213
-----------------------------------------------------------


On Aug. 12, 2015, 5:59 p.m., Mayuresh Gharat wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36858/
> -----------------------------------------------------------
> 
> (Updated Aug. 12, 2015, 5:59 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2120
>     https://issues.apache.org/jira/browse/KAFKA-2120
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> Solved compile error
> 
> 
> Addressed Jason's comments for Kip-19
> 
> 
> Addressed Jun's comments
> 
> 
> Addressed Jason's comments about the default values for requestTimeout
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> dc8f0f115bcda893c95d17c0a57be8d14518d034 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> 2c421f42ed3fc5d61cf9c87a7eaa7bb23e26f63b 
>   clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 
> 15d00d4e484bb5d51a9ae6857ed6e024a2cc1820 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 7ab2503794ff3aab39df881bd9fbae6547827d3b 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 0e51d7bd461d253f4396a5b6ca7cd391658807fa 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> d35b421a515074d964c7fccb73d260b847ea5f00 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> ed99e9bdf7c4ea7a6d4555d4488cf8ed0b80641b 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
>  9517d9d0cd480d5ba1d12f1fde7963e60528d2f8 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> 03b8dd23df63a8d8a117f02eabcce4a2d48c44f7 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> aa264202f2724907924985a5ecbe74afc4c6c04b 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java
>  4cb1e50d6c4ed55241aeaef1d3af09def5274103 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
>  a152bd7697dca55609a9ec4cfe0a82c10595fbc3 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordBatch.java
>  06182db1c3a5da85648199b4c0c98b80ea7c6c0c 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> 0baf16e55046a2f49f6431e01d52c323c95eddf0 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> ce20111ac434eb8c74585e9c63757bb9d60a832f 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
> 9133d85342b11ba2c9888d4d2804d181831e7a8e 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 43238ceaad0322e39802b615bb805b895336a009 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/BufferPoolTest.java
>  2c693824fa53db1e38766b8c66a0ef42ef9d0f3a 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
>  5b2e4ffaeab7127648db608c179703b27b577414 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
>  8b1805d3d2bcb9fe2bacb37d870c3236aa9532c4 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> 158f9829ff64a969008f699e40c51e918287859e 
>   core/src/main/scala/kafka/tools/ProducerPerformance.scala 
> 0335cc64013ffe2cdf1c4879e86e11ec8c526712 
>   core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
> ee94011894b46864614b97bbd2a98375a7d3f20b 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> eb169d8b33c27d598cc24e5a2e5f78b789fa38d3 
> 
> Diff: https://reviews.apache.org/r/36858/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Mayuresh Gharat
> 
>

Reply via email to