Repository: kafka
Updated Branches:
  refs/heads/trunk 18e34482e -> d01f01ec6


KAFKA-6260; Ensure selection keys are removed from all collections on socket 
close

When a socket is closed, we must remove corresponding selection keys from 
internal collections. This fixes an NPE which is caused by attempting to access 
the selection key's attached channel after it had been cleared after 
disconnecting.

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Rajini Sivaram 
<rajinisiva...@googlemail.com>

Closes #4276 from hachikuji/KAFKA-6260


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d01f01ec
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d01f01ec
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d01f01ec

Branch: refs/heads/trunk
Commit: d01f01ec63f61bd4742f02abff2ab6cf339e2897
Parents: 18e3448
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Dec 1 11:32:44 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Dec 1 11:32:44 2017 -0800

----------------------------------------------------------------------
 .../kafka/common/network/KafkaChannel.java      |   4 +
 .../common/network/PlaintextTransportLayer.java |  21 +--
 .../apache/kafka/common/network/Selector.java   | 162 ++++++++++---------
 .../kafka/common/network/SslTransportLayer.java |  14 +-
 .../kafka/common/network/TransportLayer.java    |  10 +-
 .../kafka/common/network/SelectorTest.java      | 115 ++++++++++---
 .../kafka/common/network/SslSelectorTest.java   |  27 ++++
 7 files changed, 232 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d01f01ec/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java 
b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index f07035a..e125bbc 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -117,6 +117,10 @@ public class KafkaChannel {
         return id;
     }
 
+    public SelectionKey selectionKey() {
+        return transportLayer.selectionKey();
+    }
+
     /**
      * externally muting a channel should be done via selector to ensure 
proper state handling
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/d01f01ec/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
 
b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
index 11c9565..ccb9c60 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -64,6 +64,11 @@ public class PlaintextTransportLayer implements 
TransportLayer {
     }
 
     @Override
+    public SelectionKey selectionKey() {
+        return key;
+    }
+
+    @Override
     public boolean isOpen() {
         return socketChannel.isOpen();
     }
@@ -73,20 +78,10 @@ public class PlaintextTransportLayer implements 
TransportLayer {
         return socketChannel.isConnected();
     }
 
-    /**
-     * Closes this channel
-     *
-     * @throws IOException If I/O error occurs
-     */
     @Override
     public void close() throws IOException {
-        try {
-            socketChannel.socket().close();
-            socketChannel.close();
-        } finally {
-            key.attach(null);
-            key.cancel();
-        }
+        socketChannel.socket().close();
+        socketChannel.close();
     }
 
     /**
@@ -191,7 +186,6 @@ public class PlaintextTransportLayer implements 
TransportLayer {
 
     /**
      * Adds the interestOps to selectionKey.
-     * @param ops
      */
     @Override
     public void addInterestOps(int ops) {
@@ -201,7 +195,6 @@ public class PlaintextTransportLayer implements 
TransportLayer {
 
     /**
      * Removes the interestOps from selectionKey.
-     * @param ops
      */
     @Override
     public void removeInterestOps(int ops) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d01f01ec/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 09df7b5..6bfcfd2 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -16,6 +16,23 @@
  */
 package org.apache.kafka.common.network;
 
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.MetricName;
+import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.metrics.Measurable;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Count;
+import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.common.metrics.stats.Meter;
+import org.apache.kafka.common.metrics.stats.SampledStat;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.slf4j.Logger;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
@@ -37,23 +54,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.metrics.Measurable;
-import org.apache.kafka.common.metrics.MetricConfig;
-import org.apache.kafka.common.MetricName;
-import org.apache.kafka.common.errors.AuthenticationException;
-import org.apache.kafka.common.metrics.Metrics;
-import org.apache.kafka.common.metrics.Sensor;
-import org.apache.kafka.common.metrics.stats.Avg;
-import org.apache.kafka.common.metrics.stats.Meter;
-import org.apache.kafka.common.metrics.stats.SampledStat;
-import org.apache.kafka.common.metrics.stats.Count;
-import org.apache.kafka.common.metrics.stats.Max;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.Time;
-import org.slf4j.Logger;
-
 /**
  * A nioSelector interface for doing non-blocking multi-connection network I/O.
  * <p>
@@ -195,12 +195,37 @@ public class Selector implements Selectable, 
AutoCloseable {
      */
     @Override
     public void connect(String id, InetSocketAddress address, int 
sendBufferSize, int receiveBufferSize) throws IOException {
-        if (this.channels.containsKey(id))
-            throw new IllegalStateException("There is already a connection for 
id " + id);
-        if (this.closingChannels.containsKey(id))
-            throw new IllegalStateException("There is already a connection for 
id " + id + " that is still being closed");
-
+        ensureNotRegistered(id);
         SocketChannel socketChannel = SocketChannel.open();
+        try {
+            configureSocketChannel(socketChannel, sendBufferSize, 
receiveBufferSize);
+            boolean connected = doConnect(socketChannel, address);
+            SelectionKey key = registerChannel(id, socketChannel, 
SelectionKey.OP_CONNECT);
+
+            if (connected) {
+                // OP_CONNECT won't trigger for immediately connected channels
+                log.debug("Immediately connected to node {}", id);
+                immediatelyConnectedKeys.add(key);
+                key.interestOps(0);
+            }
+        } catch (IOException | RuntimeException e) {
+            socketChannel.close();
+            throw e;
+        }
+    }
+
+    // Visible to allow test cases to override. In particular, we use this to 
implement a blocking connect
+    // in order to simulate "immediately connected" sockets.
+    protected boolean doConnect(SocketChannel channel, InetSocketAddress 
address) throws IOException {
+        try {
+            return channel.connect(address);
+        } catch (UnresolvedAddressException e) {
+            throw new IOException("Can't resolve address: " + address, e);
+        }
+    }
+
+    private void configureSocketChannel(SocketChannel socketChannel, int 
sendBufferSize, int receiveBufferSize)
+            throws IOException {
         socketChannel.configureBlocking(false);
         Socket socket = socketChannel.socket();
         socket.setKeepAlive(true);
@@ -209,25 +234,6 @@ public class Selector implements Selectable, AutoCloseable 
{
         if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
             socket.setReceiveBufferSize(receiveBufferSize);
         socket.setTcpNoDelay(true);
-        boolean connected;
-        try {
-            connected = socketChannel.connect(address);
-        } catch (UnresolvedAddressException e) {
-            socketChannel.close();
-            throw new IOException("Can't resolve address: " + address, e);
-        } catch (IOException e) {
-            socketChannel.close();
-            throw e;
-        }
-        SelectionKey key = socketChannel.register(nioSelector, 
SelectionKey.OP_CONNECT);
-        KafkaChannel channel = buildChannel(socketChannel, id, key);
-
-        if (connected) {
-            // OP_CONNECT won't trigger for immediately connected channels
-            log.debug("Immediately connected to node {}", channel.id());
-            immediatelyConnectedKeys.add(key);
-            key.interestOps(0);
-        }
     }
 
     /**
@@ -245,19 +251,29 @@ public class Selector implements Selectable, 
AutoCloseable {
      * </p>
      */
     public void register(String id, SocketChannel socketChannel) throws 
IOException {
+        ensureNotRegistered(id);
+        registerChannel(id, socketChannel, SelectionKey.OP_READ);
+    }
+
+    private void ensureNotRegistered(String id) {
         if (this.channels.containsKey(id))
             throw new IllegalStateException("There is already a connection for 
id " + id);
         if (this.closingChannels.containsKey(id))
             throw new IllegalStateException("There is already a connection for 
id " + id + " that is still being closed");
+    }
 
-        SelectionKey key = socketChannel.register(nioSelector, 
SelectionKey.OP_READ);
-        buildChannel(socketChannel, id, key);
+    private SelectionKey registerChannel(String id, SocketChannel 
socketChannel, int interestedOps) throws IOException {
+        SelectionKey key = socketChannel.register(nioSelector, interestedOps);
+        KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, 
key);
+        this.channels.put(id, channel);
+        return key;
     }
 
-    private KafkaChannel buildChannel(SocketChannel socketChannel, String id, 
SelectionKey key) throws IOException {
-        KafkaChannel channel;
+    private KafkaChannel buildAndAttachKafkaChannel(SocketChannel 
socketChannel, String id, SelectionKey key) throws IOException {
         try {
-            channel = channelBuilder.buildChannel(id, key, maxReceiveSize, 
memoryPool);
+            KafkaChannel channel = channelBuilder.buildChannel(id, key, 
maxReceiveSize, memoryPool);
+            key.attach(channel);
+            return channel;
         } catch (Exception e) {
             try {
                 socketChannel.close();
@@ -266,9 +282,6 @@ public class Selector implements Selectable, AutoCloseable {
             }
             throw new IOException("Channel could not be created for socket " + 
socketChannel, e);
         }
-        key.attach(channel);
-        this.channels.put(id, channel);
-        return channel;
     }
 
     /**
@@ -386,20 +399,22 @@ public class Selector implements Selectable, 
AutoCloseable {
 
         if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || 
dataInBuffers) {
             Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
-            keysWithBufferedRead.removeAll(readyKeys); //so no channel gets 
polled twice
 
-            //poll from channels that have buffered data (but nothing more 
from the underlying socket)
-            if (!keysWithBufferedRead.isEmpty()) {
+            // Poll from channels that have buffered data (but nothing more 
from the underlying socket)
+            if (dataInBuffers) {
+                keysWithBufferedRead.removeAll(readyKeys); //so no channel 
gets polled twice
                 Set<SelectionKey> toPoll = keysWithBufferedRead;
                 keysWithBufferedRead = new HashSet<>(); //poll() calls will 
repopulate if needed
                 pollSelectionKeys(toPoll, false, endSelect);
             }
-            //poll from channels where the underlying socket has more data
-            pollSelectionKeys(readyKeys, false, endSelect);
-            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
 
+            // Poll from channels where the underlying socket has more data
+            pollSelectionKeys(readyKeys, false, endSelect);
             // Clear all selected keys so that they are included in the ready 
count for the next select
             readyKeys.clear();
+
+            pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
+            immediatelyConnectedKeys.clear();
         } else {
             madeReadProgressLastPoll = true; //no work is also "progress"
         }
@@ -424,11 +439,9 @@ public class Selector implements Selectable, AutoCloseable 
{
      */
     // package-private for testing
     void pollSelectionKeys(Set<SelectionKey> selectionKeys,
-                                   boolean isImmediatelyConnected,
-                                   long currentTimeNanos) {
-        Iterator<SelectionKey> iterator = 
determineHandlingOrder(selectionKeys).iterator();
-        while (iterator.hasNext()) {
-            SelectionKey key = iterator.next();
+                           boolean isImmediatelyConnected,
+                           long currentTimeNanos) {
+        for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
             KafkaChannel channel = channel(key);
             long channelStartTimeNanos = recordTimePerConnection ? 
time.nanoseconds() : 0;
 
@@ -507,16 +520,13 @@ public class Selector implements Selectable, 
AutoCloseable {
     private Collection<SelectionKey> determineHandlingOrder(Set<SelectionKey> 
selectionKeys) {
         //it is possible that the iteration order over selectionKeys is the 
same every invocation.
         //this may cause starvation of reads when memory is low. to address 
this we shuffle the keys if memory is low.
-        Collection<SelectionKey> inHandlingOrder;
-
         if (!outOfMemory && memoryPool.availableMemory() < lowMemThreshold) {
-            List<SelectionKey> temp = new ArrayList<>(selectionKeys);
-            Collections.shuffle(temp);
-            inHandlingOrder = temp;
+            List<SelectionKey> shuffledKeys = new ArrayList<>(selectionKeys);
+            Collections.shuffle(shuffledKeys);
+            return shuffledKeys;
         } else {
-            inHandlingOrder = selectionKeys;
+            return selectionKeys;
         }
-        return inHandlingOrder;
     }
 
     private void attemptRead(SelectionKey key, KafkaChannel channel) throws 
IOException {
@@ -642,19 +652,17 @@ public class Selector implements Selectable, 
AutoCloseable {
     /**
      * Check for data, waiting up to the given timeout.
      *
-     * @param ms Length of time to wait, in milliseconds, which must be 
non-negative
+     * @param timeoutMs Length of time to wait, in milliseconds, which must be 
non-negative
      * @return The number of keys ready
-     * @throws IllegalArgumentException
-     * @throws IOException
      */
-    private int select(long ms) throws IOException {
-        if (ms < 0L)
+    private int select(long timeoutMs) throws IOException {
+        if (timeoutMs < 0L)
             throw new IllegalArgumentException("timeout should be >= 0");
 
-        if (ms == 0L)
+        if (timeoutMs == 0L)
             return this.nioSelector.selectNow();
         else
-            return this.nioSelector.select(ms);
+            return this.nioSelector.select(timeoutMs);
     }
 
     /**
@@ -713,10 +721,16 @@ public class Selector implements Selectable, 
AutoCloseable {
     }
 
     private void doClose(KafkaChannel channel, boolean notifyDisconnect) {
+        SelectionKey key = channel.selectionKey();
         try {
+            immediatelyConnectedKeys.remove(key);
+            keysWithBufferedRead.remove(key);
             channel.close();
         } catch (IOException e) {
             log.error("Exception closing connection to node {}:", 
channel.id(), e);
+        } finally {
+            key.cancel();
+            key.attach(null);
         }
         this.sensors.connectionClosed.record();
         this.stagedReceives.remove(channel);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d01f01ec/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 51ebbc1..69ca037 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -131,6 +131,11 @@ public class SslTransportLayer implements TransportLayer {
     }
 
     @Override
+    public SelectionKey selectionKey() {
+        return key;
+    }
+
+    @Override
     public boolean isOpen() {
         return socketChannel.isOpen();
     }
@@ -169,13 +174,8 @@ public class SslTransportLayer implements TransportLayer {
         } catch (IOException ie) {
             log.warn("Failed to send SSL Close message ", ie);
         } finally {
-            try {
-                socketChannel.socket().close();
-                socketChannel.close();
-            } finally {
-                key.attach(null);
-                key.cancel();
-            }
+            socketChannel.socket().close();
+            socketChannel.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d01f01ec/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
index 23f866b..3673d21 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -25,6 +25,7 @@ package org.apache.kafka.common.network;
  */
 import java.io.IOException;
 import java.nio.channels.FileChannel;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.ScatteringByteChannel;
 import java.nio.channels.GatheringByteChannel;
@@ -60,13 +61,16 @@ public interface TransportLayer extends 
ScatteringByteChannel, GatheringByteChan
      */
     SocketChannel socketChannel();
 
+    /**
+     * Get the underlying selection key
+     */
+    SelectionKey selectionKey();
 
     /**
      * This a no-op for the non-secure PLAINTEXT implementation. For SSL, this 
performs
      * SSL handshake. The SSL handshake includes client authentication if 
configured using
-     * {@link 
org.apache.kafka.common.config.SslConfigsSslConfigs#SSL_CLIENT_AUTH_CONFIG}.
-     * @throws AuthenticationException if handshake fails due to an
-     *         {@link javax.net.ssl.SSLExceptionSSLException}.
+     * {@link 
org.apache.kafka.common.config.SslConfigs#SSL_CLIENT_AUTH_CONFIG}.
+     * @throws AuthenticationException if handshake fails due to an {@link 
javax.net.ssl.SSLException}.
      * @throws IOException if read or write fails with an I/O error.
     */
     void handshake() throws AuthenticationException, IOException;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d01f01ec/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 36b847b..f1c6a5a 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.IMocksControl;
 import org.junit.After;
@@ -41,6 +42,7 @@ import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -104,16 +106,29 @@ public class SelectorTest {
      */
     @Test
     public void testServerDisconnect() throws Exception {
-        String node = "0";
+        final String node = "0";
 
         // connect and do a simple request
         blockingConnect(node);
         assertEquals("hello", blockingRequest(node, "hello"));
 
+        KafkaChannel channel = selector.channel(node);
+
         // disconnect
         this.server.closeConnections();
-        while (!selector.disconnected().containsKey(node))
-            selector.poll(1000L);
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                try {
+                    selector.poll(1000L);
+                    return selector.disconnected().containsKey(node);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }, 5000, "Failed to observe disconnected node in disconnected set");
+
+        assertNull(channel.selectionKey().attachment());
 
         // reconnect and do another request
         blockingConnect(node);
@@ -186,8 +201,8 @@ public class SelectorTest {
         for (int i = 0; i < conns; i++)
             connect(Integer.toString(i), addr);
         // send echo requests and receive responses
-        Map<String, Integer> requests = new HashMap<String, Integer>();
-        Map<String, Integer> responses = new HashMap<String, Integer>();
+        Map<String, Integer> requests = new HashMap<>();
+        Map<String, Integer> responses = new HashMap<>();
         int responseCount = 0;
         for (int i = 0; i < conns; i++) {
             String node = Integer.toString(i);
@@ -252,11 +267,6 @@ public class SelectorTest {
         sendAndReceive(node, requestPrefix, 0, reqs);
     }
 
-
-
-    /**
-     * Test sending an empty string
-     */
     @Test
     public void testEmptyRequest() throws Exception {
         String node = "0";
@@ -333,6 +343,7 @@ public class SelectorTest {
         assertNull("Channel not removed from closingChannels", 
selector.closingChannel(id));
         assertTrue("Unexpected disconnect notification", 
selector.disconnected().isEmpty());
         assertEquals(ChannelState.EXPIRED, channel.state());
+        assertNull(channel.selectionKey().attachment());
         selector.poll(0);
         assertTrue("Unexpected disconnect notification", 
selector.disconnected().isEmpty());
     }
@@ -350,6 +361,41 @@ public class SelectorTest {
     }
 
     @Test
+    public void testImmediatelyConnectedCleaned() throws Exception {
+        Metrics metrics = new Metrics(); // new metrics object to avoid metric 
registration conflicts
+        Selector selector = new Selector(5000, metrics, time, "MetricGroup", 
channelBuilder, new LogContext()) {
+            @Override
+            protected boolean doConnect(SocketChannel channel, 
InetSocketAddress address) throws IOException {
+                // Use a blocking connect to trigger the immediately connected 
path
+                channel.configureBlocking(true);
+                boolean connected = super.doConnect(channel, address);
+                channel.configureBlocking(false);
+                return connected;
+            }
+        };
+
+        try {
+            testImmediatelyConnectedCleaned(selector, true);
+            testImmediatelyConnectedCleaned(selector, false);
+        } finally {
+            selector.close();
+            metrics.close();
+        }
+    }
+
+    private void testImmediatelyConnectedCleaned(Selector selector, boolean 
closeAfterFirstPoll) throws Exception {
+        String id = "0";
+        selector.connect(id, new InetSocketAddress("localhost", server.port), 
BUFFER_SIZE, BUFFER_SIZE);
+        verifyNonEmptyImmediatelyConnectedKeys(selector);
+        if (closeAfterFirstPoll) {
+            selector.poll(0);
+            verifyEmptyImmediatelyConnectedKeys(selector);
+        }
+        selector.close(id);
+        verifySelectorEmpty(selector);
+    }
+
+    @Test
     public void testCloseOldestConnectionWithOneStagedReceive() throws 
Exception {
         verifyCloseOldestConnectionWithStagedReceives(1);
     }
@@ -410,8 +456,6 @@ public class SelectorTest {
         assertTrue("Unexpected receive", 
selector.completedReceives().isEmpty());
     }
 
-
-
     @Test
     public void testMuteOnOOM() throws Exception {
         //clean up default selector, replace it with one that uses a finite 
mem pool
@@ -517,8 +561,11 @@ public class SelectorTest {
         expectLastCall().andThrow(new IOException());
 
         SelectionKey selectionKey = control.createMock(SelectionKey.class);
+        expect(kafkaChannel.selectionKey()).andStubReturn(selectionKey);
         expect(selectionKey.channel()).andReturn(SocketChannel.open());
         expect(selectionKey.readyOps()).andStubReturn(SelectionKey.OP_CONNECT);
+        selectionKey.cancel();
+        expectLastCall();
 
         control.replay();
 
@@ -528,6 +575,7 @@ public class SelectorTest {
 
         assertFalse(selector.connected().contains(kafkaChannel.id()));
         assertTrue(selector.disconnected().containsKey(kafkaChannel.id()));
+        assertNull(selectionKey.attachment());
 
         control.verify();
     }
@@ -551,6 +599,7 @@ public class SelectorTest {
     private void blockingConnect(String node) throws IOException {
         blockingConnect(node, new InetSocketAddress("localhost", server.port));
     }
+
     protected void blockingConnect(String node, InetSocketAddress serverAddr) 
throws IOException {
         selector.connect(node, serverAddr, BUFFER_SIZE, BUFFER_SIZE);
         while (!selector.connected().contains(node))
@@ -589,21 +638,41 @@ public class SelectorTest {
         }
     }
 
-    private void verifySelectorEmpty() throws Exception {
-        for (KafkaChannel channel : selector.channels())
+    private void verifyNonEmptyImmediatelyConnectedKeys(Selector selector) 
throws Exception {
+        Field field = 
Selector.class.getDeclaredField("immediatelyConnectedKeys");
+        field.setAccessible(true);
+        Collection<?> immediatelyConnectedKeys = (Collection<?>) 
field.get(selector);
+        assertFalse(immediatelyConnectedKeys.isEmpty());
+    }
+
+    private void verifyEmptyImmediatelyConnectedKeys(Selector selector) throws 
Exception {
+        Field field = 
Selector.class.getDeclaredField("immediatelyConnectedKeys");
+        ensureEmptySelectorField(selector, field);
+    }
+
+    protected void verifySelectorEmpty() throws Exception {
+        verifySelectorEmpty(this.selector);
+    }
+
+    private void verifySelectorEmpty(Selector selector) throws Exception {
+        for (KafkaChannel channel : selector.channels()) {
             selector.close(channel.id());
+            assertNull(channel.selectionKey().attachment());
+        }
         selector.poll(0);
         selector.poll(0); // Poll a second time to clear everything
-        for (Field field : selector.getClass().getDeclaredFields()) {
-            field.setAccessible(true);
-            Object obj = field.get(selector);
-            if (obj instanceof Set)
-                assertTrue("Field not empty: " + field + " " + obj, ((Set<?>) 
obj).isEmpty());
-            else if (obj instanceof Map)
-                assertTrue("Field not empty: " + field + " " + obj, ((Map<?, 
?>) obj).isEmpty());
-            else if (obj instanceof List)
-                assertTrue("Field not empty: " + field + " " + obj, ((List<?>) 
obj).isEmpty());
+        for (Field field : Selector.class.getDeclaredFields()) {
+            ensureEmptySelectorField(selector, field);
         }
     }
 
+    private void ensureEmptySelectorField(Selector selector, Field field) 
throws Exception {
+        field.setAccessible(true);
+        Object obj = field.get(selector);
+        if (obj instanceof Collection)
+            assertTrue("Field not empty: " + field + " " + obj, 
((Collection<?>) obj).isEmpty());
+        else if (obj instanceof Map)
+            assertTrue("Field not empty: " + field + " " + obj, ((Map<?, ?>) 
obj).isEmpty());
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d01f01ec/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index bf2e77c..dc062ea 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -22,7 +22,9 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestSslUtils;
+import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -77,6 +79,30 @@ public class SslSelectorTest extends SelectorTest {
         return SecurityProtocol.PLAINTEXT;
     }
 
+    @Test
+    public void testDisconnectWithIntermediateBufferedBytes() throws Exception 
{
+        int requestSize = 100 * 1024;
+        final String node = "0";
+        connect(node, new InetSocketAddress("localhost", server.port));
+        String request = TestUtils.randomString(requestSize);
+        selector.send(createSend(node, request));
+
+        TestUtils.waitForCondition(new TestCondition() {
+            @Override
+            public boolean conditionMet() {
+                try {
+                    selector.poll(0L);
+                    return selector.channel(node).hasBytesBuffered();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }, 2000L, "Failed to reach socket state with bytes buffered");
+
+        selector.close(node);
+        verifySelectorEmpty();
+    }
+
     /**
      * Renegotiation is not supported since it is potentially unsafe and it 
has been removed in TLS 1.3
      */
@@ -197,4 +223,5 @@ public class SslSelectorTest extends SelectorTest {
     private SslSender createSender(InetSocketAddress serverAddress, byte[] 
payload) {
         return new SslSender(serverAddress, payload);
     }
+
 }

Reply via email to