Repository: kafka Updated Branches: refs/heads/trunk bd8681cdd -> b28bc57a1
MINOR: Improve handling of channel close exception Propagate IOException in SslTransportLayer channel.close to be consistent with PlaintextTransportLayer, close authenticator on channel close even if transport layer close fails. Author: Rajini Sivaram <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #1370 from rajinisivaram/minor-channelclose2 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b28bc57a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b28bc57a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b28bc57a Branch: refs/heads/trunk Commit: b28bc57a1fdb9b56c89c3cb9c3df60afbeda521c Parents: bd8681c Author: Rajini Sivaram <[email protected]> Authored: Wed May 11 21:11:17 2016 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed May 11 21:11:17 2016 +0100 ---------------------------------------------------------------------- .../kafka/common/network/Authenticator.java | 10 +-- .../kafka/common/network/KafkaChannel.java | 5 +- .../kafka/common/network/SslTransportLayer.java | 9 +-- .../org/apache/kafka/common/utils/Utils.java | 23 ++++++ .../apache/kafka/common/utils/UtilsTest.java | 80 ++++++++++++++++++++ 5 files changed, 112 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java index 6f01fe5..0012f15 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Authenticator.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.network; +import java.io.Closeable; import java.io.IOException; import java.util.Map; import java.security.Principal; @@ -27,7 +28,7 @@ import org.apache.kafka.common.KafkaException; /** * Authentication for Channel */ -public interface Authenticator { +public interface Authenticator extends Closeable { /** * Configures Authenticator using the provided parameters. @@ -54,11 +55,4 @@ public interface Authenticator { */ boolean complete(); - /** - * Closes this Authenticator - * - * @throws IOException if any I/O error occurs - */ - void close() throws IOException; - } http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index f72f91b..16002eb 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -26,6 +26,8 @@ import java.nio.channels.SelectionKey; import java.security.Principal; +import org.apache.kafka.common.utils.Utils; + public class KafkaChannel { private final String id; private final TransportLayer transportLayer; @@ -42,8 +44,7 @@ public class KafkaChannel { } public void close() throws IOException { - transportLayer.close(); - authenticator.close(); + Utils.closeAll(transportLayer, authenticator); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java index d18d6b7..cfd618d 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java +++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java @@ -141,7 +141,7 @@ public class SslTransportLayer implements TransportLayer { * Sends a SSL close message and closes socketChannel. */ @Override - public void close() { + public void close() throws IOException { if (closing) return; closing = true; sslEngine.closeOutbound(); @@ -168,12 +168,11 @@ public class SslTransportLayer implements TransportLayer { try { socketChannel.socket().close(); socketChannel.close(); - } catch (IOException e) { - log.warn("Failed to close SSL socket channel: " + e); + } finally { + key.attach(null); + key.cancel(); } } - key.attach(null); - key.cancel(); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index bd173ed..e740618 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -14,6 +14,7 @@ package org.apache.kafka.common.utils; import java.io.IOException; import java.io.InputStream; +import java.io.Closeable; import java.io.File; import java.io.FileInputStream; import java.io.OutputStream; @@ -676,4 +677,26 @@ public class Utils { } } + /** + * Closes all the provided closeables. + * @throws IOException if any of the close methods throws an IOException. + * The first IOException is thrown with subsequent exceptions + * added as suppressed exceptions. + */ + public static void closeAll(Closeable... closeables) throws IOException { + IOException exception = null; + for (Closeable closeable : closeables) { + try { + closeable.close(); + } catch (IOException e) { + if (exception != null) + exception.addSuppressed(e); + else + exception = e; + } + } + if (exception != null) + throw exception; + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/b28bc57a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 1078578..1af7e43 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -18,6 +18,8 @@ package org.apache.kafka.common.utils; import java.util.Arrays; import java.util.Collections; +import java.io.Closeable; +import java.io.IOException; import java.nio.ByteBuffer; import org.junit.Test; @@ -26,6 +28,8 @@ import static org.apache.kafka.common.utils.Utils.getHost; import static org.apache.kafka.common.utils.Utils.getPort; import static org.apache.kafka.common.utils.Utils.formatAddress; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class UtilsTest { @@ -114,4 +118,80 @@ public class UtilsTest { assertEquals(1, Utils.min(2, 1, 3)); assertEquals(1, Utils.min(2, 3, 1)); } + + @Test + public void testCloseAll() { + TestCloseable[] closeablesWithoutException = TestCloseable.createCloseables(false, false, false); + try { + Utils.closeAll(closeablesWithoutException); + TestCloseable.checkClosed(closeablesWithoutException); + } catch (IOException e) { + fail("Unexpected exception: " + e); + } + + TestCloseable[] closeablesWithException = TestCloseable.createCloseables(true, true, true); + try { + Utils.closeAll(closeablesWithException); + fail("Expected exception not thrown"); + } catch (IOException e) { + TestCloseable.checkClosed(closeablesWithException); + TestCloseable.checkException(e, closeablesWithException); + } + + TestCloseable[] singleExceptionCloseables = TestCloseable.createCloseables(false, true, false); + try { + Utils.closeAll(singleExceptionCloseables); + fail("Expected exception not thrown"); + } catch (IOException e) { + TestCloseable.checkClosed(singleExceptionCloseables); + TestCloseable.checkException(e, singleExceptionCloseables[1]); + } + + TestCloseable[] mixedCloseables = TestCloseable.createCloseables(false, true, false, true, true); + try { + Utils.closeAll(mixedCloseables); + fail("Expected exception not thrown"); + } catch (IOException e) { + TestCloseable.checkClosed(mixedCloseables); + TestCloseable.checkException(e, mixedCloseables[1], mixedCloseables[3], mixedCloseables[4]); + } + } + + private static class TestCloseable implements Closeable { + private final int id; + private final IOException closeException; + private boolean closed; + + TestCloseable(int id, boolean exceptionOnClose) { + this.id = id; + this.closeException = exceptionOnClose ? new IOException("Test close exception " + id) : null; + } + + @Override + public void close() throws IOException { + closed = true; + if (closeException != null) + throw closeException; + } + + static TestCloseable[] createCloseables(boolean... exceptionOnClose) { + TestCloseable[] closeables = new TestCloseable[exceptionOnClose.length]; + for (int i = 0; i < closeables.length; i++) + closeables[i] = new TestCloseable(i, exceptionOnClose[i]); + return closeables; + } + + static void checkClosed(TestCloseable... closeables) { + for (TestCloseable closeable : closeables) + assertTrue("Close not invoked for " + closeable.id, closeable.closed); + } + + static void checkException(IOException e, TestCloseable... closeablesWithException) { + assertEquals(closeablesWithException[0].closeException, e); + Throwable[] suppressed = e.getSuppressed(); + assertEquals(closeablesWithException.length - 1, suppressed.length); + for (int i = 1; i < closeablesWithException.length; i++) + assertEquals(closeablesWithException[i].closeException, suppressed[i - 1]); + } + } }
