Repository: kafka Updated Branches: refs/heads/0.11.0 3a515505e -> 693318d2a
KAFKA-5587; Remove channel only after staged receives are delivered When idle connections are closed, ensure that channels with staged receives are retained in `closingChannels` until all staged receives are completed. Also ensure that only one staged receive is completed in each poll, even when channels are closed. Author: Rajini Sivaram <rajinisiva...@googlemail.com> Reviewers: Jun Rao <jun...@gmail.com>, Ismael Juma <ism...@juma.me.uk> Closes #3526 from rajinisivaram/KAFKA-5587 (cherry picked from commit 28c83d9667676515607713d4ccfd3757a8afcba2) Signed-off-by: Ismael Juma <ism...@juma.me.uk> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/693318d2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/693318d2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/693318d2 Branch: refs/heads/0.11.0 Commit: 693318d2aeaa89f68acccbd5ac1618f85a08130a Parents: 3a51550 Author: Rajini Sivaram <rajinisiva...@googlemail.com> Authored: Mon Jul 17 20:29:26 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Tue Jul 18 00:01:18 2017 +0100 ---------------------------------------------------------------------- clients/out/test/resources/log4j.properties | 21 ++++++++ .../offsetAndMetadataSerializedfile | Bin 0 -> 144 bytes .../serializedData/topicPartitionSerializedfile | Bin 0 -> 125 bytes .../apache/kafka/common/network/Selector.java | 18 ++++--- .../kafka/common/network/SelectorTest.java | 54 +++++++++++++++++++ 5 files changed, 86 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/693318d2/clients/out/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/clients/out/test/resources/log4j.properties b/clients/out/test/resources/log4j.properties new file mode 100644 index 0000000..b1d5b7f --- /dev/null +++ b/clients/out/test/resources/log4j.properties @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +log4j.rootLogger=OFF, stdout + +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n + +log4j.logger.org.apache.kafka=ERROR http://git-wip-us.apache.org/repos/asf/kafka/blob/693318d2/clients/out/test/resources/serializedData/offsetAndMetadataSerializedfile ---------------------------------------------------------------------- diff --git a/clients/out/test/resources/serializedData/offsetAndMetadataSerializedfile b/clients/out/test/resources/serializedData/offsetAndMetadataSerializedfile new file mode 100644 index 0000000..95319cb Binary files /dev/null and b/clients/out/test/resources/serializedData/offsetAndMetadataSerializedfile differ http://git-wip-us.apache.org/repos/asf/kafka/blob/693318d2/clients/out/test/resources/serializedData/topicPartitionSerializedfile ---------------------------------------------------------------------- diff --git a/clients/out/test/resources/serializedData/topicPartitionSerializedfile b/clients/out/test/resources/serializedData/topicPartitionSerializedfile new file mode 100644 index 0000000..2c1c501 Binary files /dev/null and b/clients/out/test/resources/serializedData/topicPartitionSerializedfile differ http://git-wip-us.apache.org/repos/asf/kafka/blob/693318d2/clients/src/main/java/org/apache/kafka/common/network/Selector.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 5dbe83b..da3de80 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -327,14 +327,16 @@ public class Selector implements Selectable, AutoCloseable { pollSelectionKeys(immediatelyConnectedKeys, true, endSelect); } - addToCompletedReceives(); - long endIo = time.nanoseconds(); this.sensors.ioTime.record(endIo - endSelect, time.milliseconds()); // we use the time at the end of select to ensure that we don't close any connections that // have just been processed in pollSelectionKeys maybeCloseOldestConnection(endSelect); + + // Add to completedReceives after closing expired connections to avoid removing + // channels with completed receives until all staged receives are completed. + addToCompletedReceives(); } private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys, @@ -563,11 +565,7 @@ public class Selector implements Selectable, AutoCloseable { // are tracked to ensure that requests are processed one-by-one by the broker to preserve ordering. Deque<NetworkReceive> deque = this.stagedReceives.get(channel); if (processOutstanding && deque != null && !deque.isEmpty()) { - if (!channel.isMute()) { - addToCompletedReceives(channel, deque); - if (deque.isEmpty()) - this.stagedReceives.remove(channel); - } + // stagedReceives will be moved to completedReceives later along with receives from other channels closingChannels.put(channel.id(), channel); } else doClose(channel, processOutstanding); @@ -697,6 +695,12 @@ public class Selector implements Selectable, AutoCloseable { return new HashSet<>(nioSelector.keys()); } + // only for testing + int numStagedReceives(KafkaChannel channel) { + Deque<NetworkReceive> deque = stagedReceives.get(channel); + return deque == null ? 0 : deque.size(); + } + private class SelectorMetrics { private final Metrics metrics; private final String metricGrpPrefix; http://git-wip-us.apache.org/repos/asf/kafka/blob/693318d2/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java index 33959fd..76ebb21 100644 --- a/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java +++ b/clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java @@ -17,6 +17,9 @@ package org.apache.kafka.common.network; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.HashMap; @@ -268,6 +271,57 @@ public class SelectorTest { assertEquals(ChannelState.EXPIRED, selector.disconnected().get(id)); } + @Test + public void testCloseOldestConnectionWithOneStagedReceive() throws Exception { + verifyCloseOldestConnectionWithStagedReceives(1); + } + + @Test + public void testCloseOldestConnectionWithMultipleStagedReceives() throws Exception { + verifyCloseOldestConnectionWithStagedReceives(5); + } + + private void verifyCloseOldestConnectionWithStagedReceives(int maxStagedReceives) throws Exception { + String id = "0"; + blockingConnect(id); + KafkaChannel channel = selector.channel(id); + + selector.mute(id); + for (int i = 0; i <= maxStagedReceives; i++) { + selector.send(createSend(id, String.valueOf(i))); + selector.poll(1000); + } + + selector.unmute(id); + do { + selector.poll(1000); + } while (selector.completedReceives().isEmpty()); + + int stagedReceives = selector.numStagedReceives(channel); + int completedReceives = 0; + while (selector.disconnected().isEmpty()) { + time.sleep(6000); // The max idle time is 5000ms + selector.poll(0); + completedReceives += selector.completedReceives().size(); + // With SSL, more receives may be staged from buffered data + int newStaged = selector.numStagedReceives(channel) - (stagedReceives - completedReceives); + if (newStaged > 0) { + stagedReceives += newStaged; + assertNotNull("Channel should not have been expired", selector.channel(id)); + assertFalse("Channel should not have been disconnected", selector.disconnected().containsKey(id)); + } else if (!selector.completedReceives().isEmpty()) { + assertEquals(1, selector.completedReceives().size()); + assertTrue("Channel not found", selector.closingChannel(id) != null || selector.channel(id) != null); + assertFalse("Disconnect notified too early", selector.disconnected().containsKey(id)); + } + } + assertEquals(maxStagedReceives, completedReceives); + assertEquals(stagedReceives, completedReceives); + assertNull("Channel not removed", selector.channel(id)); + assertNull("Channel not removed", selector.closingChannel(id)); + assertTrue("Disconnect not notified", selector.disconnected().containsKey(id)); + assertTrue("Unexpected receive", selector.completedReceives().isEmpty()); + } private String blockingRequest(String node, String s) throws IOException { selector.send(createSend(node, s));