Repository: kafka Updated Branches: refs/heads/trunk ab76dbd1f -> 017c00caf
kafka-2266; Client Selector can drop idle connections without notifying NetworkClient; patched by Jason Gustafson; reviewed by Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/017c00ca Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/017c00ca Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/017c00ca Branch: refs/heads/trunk Commit: 017c00caf44aaad3418cb99d3ef42c4d1b066ddd Parents: ab76dbd Author: Jason Gustafson <[email protected]> Authored: Fri Jun 12 10:24:54 2015 -0700 Committer: Jun Rao <[email protected]> Committed: Fri Jun 12 10:24:54 2015 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/common/network/Selector.java | 2 ++ .../apache/kafka/common/network/SelectorTest.java | 17 ++++++++++++++++- 2 files changed, 18 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/017c00ca/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index effb1e6..1da215b 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -393,6 +393,8 @@ public class Selector implements Selectable { if (log.isTraceEnabled()) log.trace("About to close the idle connection from " + connectionId + " due to being idle for " + (currentTimeNanos - connectionLastActiveTime) / 1000 / 1000 + " millis"); + + disconnected.add(connectionId); close(connectionId); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/017c00ca/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index d23b4b6..158f982 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -26,6 +26,7 @@ import java.util.*; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.utils.MockTime; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -40,13 +41,15 @@ public class SelectorTest { private static final int BUFFER_SIZE = 4 * 1024; private EchoServer server; + private Time time; private Selectable selector; @Before public void setup() throws Exception { this.server = new EchoServer(); this.server.start(); - this.selector = new Selector(5000, new Metrics(), new MockTime() , "MetricGroup", new LinkedHashMap<String, String>()); + this.time = new MockTime(); + this.selector = new Selector(5000, new Metrics(), time, "MetricGroup", new LinkedHashMap<String, String>()); } @After @@ -244,6 +247,18 @@ public class SelectorTest { assertEquals("The response should be from the previously muted node", "1", selector.completedReceives().get(0).source()); } + + @Test + public void testCloseOldestConnection() throws Exception { + String id = "0"; + blockingConnect(id); + + time.sleep(6000); // The max idle time is 5000ms + selector.poll(0); + + assertTrue("The idle connection should have been closed", selector.disconnected().contains(id)); + } + private String blockingRequest(String node, String s) throws IOException { selector.send(createSend(node, s)); selector.poll(1000L);
