Repository: kafka
Updated Branches:
  refs/heads/0.11.0 3a515505e -> 693318d2a


KAFKA-5587; Remove channel only after staged receives are delivered

When idle connections are closed, ensure that channels with staged
receives are retained in `closingChannels` until all staged receives
are completed. Also ensure that only one staged receive is completed
in each poll, even when channels are closed.

Author: Rajini Sivaram <rajinisiva...@googlemail.com>

Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@juma.me.uk>

Closes #3526 from rajinisivaram/KAFKA-5587

(cherry picked from commit 28c83d9667676515607713d4ccfd3757a8afcba2)
Signed-off-by: Ismael Juma <ism...@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/693318d2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/693318d2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/693318d2

Branch: refs/heads/0.11.0
Commit: 693318d2aeaa89f68acccbd5ac1618f85a08130a
Parents: 3a51550
Author: Rajini Sivaram <rajinisiva...@googlemail.com>
Authored: Mon Jul 17 20:29:26 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Tue Jul 18 00:01:18 2017 +0100

----------------------------------------------------------------------
 clients/out/test/resources/log4j.properties     |  21 ++++++++
 .../offsetAndMetadataSerializedfile             | Bin 0 -> 144 bytes
 .../serializedData/topicPartitionSerializedfile | Bin 0 -> 125 bytes
 .../apache/kafka/common/network/Selector.java   |  18 ++++---
 .../kafka/common/network/SelectorTest.java      |  54 +++++++++++++++++++
 5 files changed, 86 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/693318d2/clients/out/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/clients/out/test/resources/log4j.properties 
b/clients/out/test/resources/log4j.properties
new file mode 100644
index 0000000..b1d5b7f
--- /dev/null
+++ b/clients/out/test/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+log4j.rootLogger=OFF, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+
+log4j.logger.org.apache.kafka=ERROR

http://git-wip-us.apache.org/repos/asf/kafka/blob/693318d2/clients/out/test/resources/serializedData/offsetAndMetadataSerializedfile
----------------------------------------------------------------------
diff --git 
a/clients/out/test/resources/serializedData/offsetAndMetadataSerializedfile 
b/clients/out/test/resources/serializedData/offsetAndMetadataSerializedfile
new file mode 100644
index 0000000..95319cb
Binary files /dev/null and 
b/clients/out/test/resources/serializedData/offsetAndMetadataSerializedfile 
differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/693318d2/clients/out/test/resources/serializedData/topicPartitionSerializedfile
----------------------------------------------------------------------
diff --git 
a/clients/out/test/resources/serializedData/topicPartitionSerializedfile 
b/clients/out/test/resources/serializedData/topicPartitionSerializedfile
new file mode 100644
index 0000000..2c1c501
Binary files /dev/null and 
b/clients/out/test/resources/serializedData/topicPartitionSerializedfile differ

http://git-wip-us.apache.org/repos/asf/kafka/blob/693318d2/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/Selector.java 
b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 5dbe83b..da3de80 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -327,14 +327,16 @@ public class Selector implements Selectable, 
AutoCloseable {
             pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
         }
 
-        addToCompletedReceives();
-
         long endIo = time.nanoseconds();
         this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
 
         // we use the time at the end of select to ensure that we don't close 
any connections that
         // have just been processed in pollSelectionKeys
         maybeCloseOldestConnection(endSelect);
+
+        // Add to completedReceives after closing expired connections to avoid 
removing
+        // channels with completed receives until all staged receives are 
completed.
+        addToCompletedReceives();
     }
 
     private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
@@ -563,11 +565,7 @@ public class Selector implements Selectable, AutoCloseable 
{
         // are tracked to ensure that requests are processed one-by-one by the 
broker to preserve ordering.
         Deque<NetworkReceive> deque = this.stagedReceives.get(channel);
         if (processOutstanding && deque != null && !deque.isEmpty()) {
-            if (!channel.isMute()) {
-                addToCompletedReceives(channel, deque);
-                if (deque.isEmpty())
-                    this.stagedReceives.remove(channel);
-            }
+            // stagedReceives will be moved to completedReceives later along 
with receives from other channels
             closingChannels.put(channel.id(), channel);
         } else
             doClose(channel, processOutstanding);
@@ -697,6 +695,12 @@ public class Selector implements Selectable, AutoCloseable 
{
         return new HashSet<>(nioSelector.keys());
     }
 
+    // only for testing
+    int numStagedReceives(KafkaChannel channel) {
+        Deque<NetworkReceive> deque = stagedReceives.get(channel);
+        return deque == null ? 0 : deque.size();
+    }
+
     private class SelectorMetrics {
         private final Metrics metrics;
         private final String metricGrpPrefix;

http://git-wip-us.apache.org/repos/asf/kafka/blob/693318d2/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
index 33959fd..76ebb21 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java
@@ -17,6 +17,9 @@
 package org.apache.kafka.common.network;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.util.HashMap;
@@ -268,6 +271,57 @@ public class SelectorTest {
         assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id));
     }
 
+    @Test
+    public void testCloseOldestConnectionWithOneStagedReceive() throws 
Exception {
+        verifyCloseOldestConnectionWithStagedReceives(1);
+    }
+
+    @Test
+    public void testCloseOldestConnectionWithMultipleStagedReceives() throws 
Exception {
+        verifyCloseOldestConnectionWithStagedReceives(5);
+    }
+
+    private void verifyCloseOldestConnectionWithStagedReceives(int 
maxStagedReceives) throws Exception {
+        String id = "0";
+        blockingConnect(id);
+        KafkaChannel channel = selector.channel(id);
+
+        selector.mute(id);
+        for (int i = 0; i <= maxStagedReceives; i++) {
+            selector.send(createSend(id, String.valueOf(i)));
+            selector.poll(1000);
+        }
+
+        selector.unmute(id);
+        do {
+            selector.poll(1000);
+        } while (selector.completedReceives().isEmpty());
+
+        int stagedReceives = selector.numStagedReceives(channel);
+        int completedReceives = 0;
+        while (selector.disconnected().isEmpty()) {
+            time.sleep(6000); // The max idle time is 5000ms
+            selector.poll(0);
+            completedReceives += selector.completedReceives().size();
+            // With SSL, more receives may be staged from buffered data
+            int newStaged = selector.numStagedReceives(channel) - 
(stagedReceives - completedReceives);
+            if (newStaged > 0) {
+                stagedReceives += newStaged;
+                assertNotNull("Channel should not have been expired", 
selector.channel(id));
+                assertFalse("Channel should not have been disconnected", 
selector.disconnected().containsKey(id));
+            } else if (!selector.completedReceives().isEmpty()) {
+                assertEquals(1, selector.completedReceives().size());
+                assertTrue("Channel not found", selector.closingChannel(id) != 
null || selector.channel(id) != null);
+                assertFalse("Disconnect notified too early", 
selector.disconnected().containsKey(id));
+            }
+        }
+        assertEquals(maxStagedReceives, completedReceives);
+        assertEquals(stagedReceives, completedReceives);
+        assertNull("Channel not removed", selector.channel(id));
+        assertNull("Channel not removed", selector.closingChannel(id));
+        assertTrue("Disconnect not notified", 
selector.disconnected().containsKey(id));
+        assertTrue("Unexpected receive", 
selector.completedReceives().isEmpty());
+    }
 
     private String blockingRequest(String node, String s) throws IOException {
         selector.send(createSend(node, s));

Reply via email to