This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 1bf63b7 IGNITE-15867 Socket shutdown called twice in GridNioServer 1bf63b7 is described below commit 1bf63b7b69546e282c9b631516effe8d2b1a656f Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com> AuthorDate: Thu Dec 16 11:21:00 2021 +0300 IGNITE-15867 Socket shutdown called twice in GridNioServer - explain that the EchoServer must be operated in the same thread in which it is started - Fixes #9651. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../apache/ignite/internal/util/IgniteUtils.java | 9 +- .../apache/ignite/internal/util/EchoServer.java | 139 +++++++++++++++++++++ .../ignite/internal/util/IgniteUtilsUnitTest.java | 72 +++++++++++ .../internal/util/nio/GridNioServerTest.java | 95 ++++++++++++++ .../ignite/testsuites/IgniteUtilSelfTestSuite.java | 4 + 5 files changed, 316 insertions(+), 3 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index f3c0d8b..acacde4 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -4245,9 +4245,12 @@ public abstract class IgniteUtils { return; try { - // Avoid tls 1.3 incompatibility https://bugs.openjdk.java.net/browse/JDK-8208526 - sock.shutdownOutput(); - sock.shutdownInput(); + // Closing output and input first to avoid tls 1.3 incompatibility + // https://bugs.openjdk.java.net/browse/JDK-8208526 + if (!sock.isOutputShutdown()) + sock.shutdownOutput(); + if (!sock.isInputShutdown()) + sock.shutdownInput(); } catch (ClosedChannelException | SocketException ex) { LT.warn(log, "Failed to shutdown socket", ex); diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/EchoServer.java b/modules/core/src/test/java/org/apache/ignite/internal/util/EchoServer.java new file mode 100644 index 0000000..21cc73c --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/EchoServer.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.ServerSocket; +import java.net.Socket; +import java.net.SocketAddress; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +/** + * A simple TCP server that echoes back every byte that it receives. Can be used when some protocol-neutral property + * of a TCP client (like socket closure handling) needs to be tested. + * + * This server must be operated (closed, address obtained, etc) from the same thread in which it was started. + */ +class EchoServer implements AutoCloseable { + /***/ + private final int port; + + /***/ + private final ExecutorService acceptorExecutor = Executors.newSingleThreadExecutor(); + + /***/ + private final ExecutorService workersExecutor = Executors.newCachedThreadPool(); + + /***/ + private ServerSocket serverSocket; + + /***/ + private volatile boolean running; + + /***/ + EchoServer(int port) { + this.port = port; + } + + /***/ + public void start() throws IOException { + running = true; + + serverSocket = new ServerSocket(port); + + acceptorExecutor.submit(new Acceptor()); + } + + /***/ + public void stop() throws IOException { + assert running : "Not started yet"; + + running = false; + + serverSocket.close(); + + IgniteUtils.shutdownNow(getClass(), acceptorExecutor, null); + IgniteUtils.shutdownNow(getClass(), workersExecutor, null); + } + + /***/ + public SocketAddress localSocketAddress() { + assert serverSocket != null; + + return serverSocket.getLocalSocketAddress(); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + stop(); + } + + /***/ + private class Acceptor implements Runnable { + /** {@inheritDoc} */ + @Override public void run() { + while (running) { + Socket socket = acceptConnection(); + workersExecutor.submit(new Worker(socket)); + } + } + + /***/ + private Socket acceptConnection() { + try { + return serverSocket.accept(); + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } + + /***/ + private class Worker implements Runnable { + /***/ + private final Socket socket; + + /***/ + private Worker(Socket socket) { + this.socket = socket; + } + + /** {@inheritDoc} */ + @Override public void run() { + try (Socket ignored = socket) { + InputStream is = socket.getInputStream(); + OutputStream os = socket.getOutputStream(); + + while (running) { + int ch = is.read(); + if (ch < 0) { + break; + } + os.write(ch); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsUnitTest.java new file mode 100644 index 0000000..fd8c237 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteUtilsUnitTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.io.IOException; +import java.nio.channels.SocketChannel; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.junit.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.is; + +/** + * Unit tests for {@link IgniteUtils}. + */ +public class IgniteUtilsUnitTest { + /***/ + private static final int PORT = 5555; + + /***/ + private final List<String> logMessages = new CopyOnWriteArrayList<>(); + + /***/ + @Test + public void shouldNotProduceWarningsWhenClosingAnAlreadyClosedSocket() throws Exception { + try (EchoServer server = new EchoServer(PORT)) { + server.start(); + + try (SocketChannel channel = connectTo(server)) { + // closing first time + channel.close(); + + // now close second time and collect logs + IgniteUtils.close(channel.socket(), logMessagesCollector()); + } + } + + assertThat(logMessages, is(empty())); + } + + /***/ + private SocketChannel connectTo(EchoServer server) throws IOException { + return SocketChannel.open(server.localSocketAddress()); + } + + /***/ + private ListeningTestLogger logMessagesCollector() { + ListeningTestLogger log = new ListeningTestLogger(); + + log.registerListener(logMessages::add); + + return log; + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioServerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioServerTest.java new file mode 100644 index 0000000..9020ca8 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioServerTest.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util.nio; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.Socket; +import java.net.UnknownHostException; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.not; + +/** + * Unit tests for {@link GridNioServer}. + */ +@RunWith(MockitoJUnitRunner.class) +public class GridNioServerTest { + /***/ + private final List<String> logMessages = new CopyOnWriteArrayList<>(); + + /***/ + private static final int PORT = 5555; + + /***/ + @Mock + private GridNioServerListener<Object> noOpListener; + + /***/ + @Test + public void shouldNotLogWarningsOnKeyClose() throws Exception { + GridNioServer<Object> server = startServerCollectingLogMessages(); + + try (Socket ignored = openSocketTo(server)) { + server.stop(); + } + + assertThat(logMessages, not(hasItem(containsString("Failed to shutdown socket")))); + assertThat(logMessages, not(hasItem(containsString("ClosedChannelException")))); + } + + /***/ + private GridNioServer<Object> startServerCollectingLogMessages() throws IgniteCheckedException, + UnknownHostException { + GridNioServer<Object> server = GridNioServer.builder() + .address(InetAddress.getLocalHost()) + .port(PORT) + .selectorCount(1) + .listener(noOpListener) + .logger(logMessagesCollector()) + .build(); + + server.start(); + + return server; + } + + /***/ + private ListeningTestLogger logMessagesCollector() { + ListeningTestLogger log = new ListeningTestLogger(); + + log.registerListener(logMessages::add); + + return log; + } + + /***/ + private Socket openSocketTo(GridNioServer<Object> server) throws IOException { + return new Socket(server.localAddress().getAddress(), server.port()); + } +} diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java index baa7730..5126a47 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java @@ -28,8 +28,10 @@ import org.apache.ignite.internal.util.HostAndPortRangeTest; import org.apache.ignite.internal.util.IgniteDevOnlyLogTest; import org.apache.ignite.internal.util.IgniteExceptionRegistrySelfTest; import org.apache.ignite.internal.util.IgniteUtilsSelfTest; +import org.apache.ignite.internal.util.IgniteUtilsUnitTest; import org.apache.ignite.internal.util.nio.GridNioDelimitedBufferSelfTest; import org.apache.ignite.internal.util.nio.GridNioSelfTest; +import org.apache.ignite.internal.util.nio.GridNioServerTest; import org.apache.ignite.internal.util.nio.GridNioSessionMetaKeySelfTest; import org.apache.ignite.internal.util.nio.GridNioSslSelfTest; import org.apache.ignite.internal.util.nio.impl.GridNioFilterChainSelfTest; @@ -80,6 +82,7 @@ import org.junit.runners.Suite; GridThreadPoolExecutorServiceSelfTest.class, IgniteThreadPoolSizeTest.class, IgniteUtilsSelfTest.class, + IgniteUtilsUnitTest.class, IgniteVersionUtilsSelfTest.class, GridSpinReadWriteLockSelfTest.class, GridQueueSelfTest.class, @@ -128,6 +131,7 @@ import org.junit.runners.Suite; // NIO. GridNioSessionMetaKeySelfTest.class, GridNioSelfTest.class, + GridNioServerTest.class, GridNioFilterChainSelfTest.class, GridNioSslSelfTest.class, GridNioDelimitedBufferSelfTest.class,