This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 8e154f0 IGNITE-14224 Extended logging on closing connection to failed client - Fixes #8824. 8e154f0 is described below commit 8e154f0d7eee5825c82e6158f3dac725d7bfe847 Author: zstan <stanilov...@gmail.com> AuthorDate: Tue Mar 16 12:12:26 2021 +0300 IGNITE-14224 Extended logging on closing connection to failed client - Fixes #8824. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../apache/ignite/internal/util/IgniteUtils.java | 28 ++++++- .../ignite/spi/discovery/tcp/ServerImpl.java | 10 +-- .../ignite/internal/IgniteClientFailuresTest.java | 97 ++++++++++++++++++---- ...cpClientDiscoverySpiFailureTimeoutSelfTest.java | 1 + .../processors/cache/index/IndexMetricsTest.java | 6 +- 5 files changed, 120 insertions(+), 22 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 5d5c360..ff71dff 100755 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -4218,6 +4218,32 @@ public abstract class IgniteUtils { } /** + * Closes given socket logging possible checked exception. + * + * @param sock Socket to close. If it's {@code null} - it's no-op. + * @param log Logger to log possible checked exception with (optional). + */ + public static void close(@Nullable Socket sock, @Nullable IgniteLogger log) { + if (sock != null) { + try { + // Avoid tls 1.3 incompatibility https://bugs.openjdk.java.net/browse/JDK-8208526 + sock.shutdownOutput(); + sock.shutdownInput(); + } + catch (Exception e) { + warn(log, "Failed to shutdown socket: " + e.getMessage(), e); + } + + try { + sock.close(); + } + catch (Exception e) { + warn(log, "Failed to close socket: " + e.getMessage(), e); + } + } + } + + /** * Closes given resource suppressing possible checked exception. * * @param rsrc Resource to close. If it's {@code null} - it's no-op. @@ -4372,7 +4398,7 @@ public abstract class IgniteUtils { return; try { - // Avoid java 12 bug see https://bugs.openjdk.java.net/browse/JDK-8219658 + // Avoid tls 1.3 incompatibility https://bugs.openjdk.java.net/browse/JDK-8208526 sock.shutdownOutput(); sock.shutdownInput(); } diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index a2ebbda..31fe6a4 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -6803,9 +6803,7 @@ class ServerImpl extends TcpDiscoveryImpl { log.debug("Unknown connection detected (is some other software connecting to " + "this Ignite port?" + (!spi.isSslEnabled() ? " missed SSL configuration?" : "" ) + - ") " + - "[rmtAddr=" + rmtAddr + - ", locAddr=" + sock.getLocalSocketAddress() + ']'); + ") [rmtAddr=" + rmtAddr + ", locAddr=" + sock.getLocalSocketAddress() + ']'); LT.warn(log, "Unknown connection detected (is some other software connecting to " + "this Ignite port?" + @@ -7364,11 +7362,11 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(clientMsgWrk.runner()); } - U.closeQuiet(sock); + U.close(sock, log); if (log.isInfoEnabled()) log.info("Finished serving remote node connection [rmtAddr=" + rmtAddr + - ", rmtPort=" + sock.getPort()); + ", rmtPort=" + sock.getPort() + ", rmtNodeId=" + nodeId + ']'); if (isLocalNodeCoordinator() && !ring.hasRemoteServerNodes()) U.enhanceThreadName(msgWorkerThread, "crd"); @@ -7862,7 +7860,7 @@ class ServerImpl extends TcpDiscoveryImpl { U.interrupt(runner()); - U.closeQuiet(sock); + U.close(sock, log); } } } diff --git a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java index 753f09c..1ebfa55 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/IgniteClientFailuresTest.java @@ -14,13 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.ignite.internal; import org.apache.ignite.IgniteCache; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.FailureType; import org.apache.ignite.internal.cluster.IgniteClusterEx; import org.apache.ignite.internal.managers.GridManagerAdapter; +import org.apache.ignite.mxbean.ClusterMetricsMXBean; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; import org.apache.ignite.testframework.GridStringLogger; @@ -36,17 +39,24 @@ import org.junit.Test; */ public class IgniteClientFailuresTest extends GridCommonAbstractTest { /** */ + private static final String EXCHANGE_WORKER_BLOCKED_MSG = "threadName=exchange-worker, blockedFor="; + + /** */ private GridStringLogger inMemoryLog; /** {@inheritDoc} */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); - if (!igniteInstanceName.startsWith("client")) { + if (igniteInstanceName.contains("client")) + cfg.setClientMode(true); + else { cfg.setClientFailureDetectionTimeout(10_000); cfg.setSystemWorkerBlockedTimeout(5_000); + cfg.setNetworkTimeout(5_000); + cfg.setGridLogger(inMemoryLog); } @@ -73,13 +83,17 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest { */ @Test public void testNoMessagesFromFailureProcessor() throws Exception { - inMemoryLog = new GridStringLogger(false, new GridTestLog4jLogger()); + GridStringLogger strLog = new GridStringLogger(false, new GridTestLog4jLogger()); + + strLog.logLength(1024 * 1024); - inMemoryLog.logLength(1024 * 1024); + inMemoryLog = strLog; IgniteEx srv = startGrid(0); - IgniteEx client00 = startClientGrid("client00"); + inMemoryLog = null; + + IgniteEx client00 = startGrid("client00"); client00.getOrCreateCache(new CacheConfiguration<>("cache0")); @@ -93,7 +107,7 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest { assertTrue(waitRes); - assertFalse(inMemoryLog.toString().contains("name=tcp-comm-worker")); + assertFalse(strLog.toString().contains("name=tcp-comm-worker")); } /** @@ -104,30 +118,85 @@ public class IgniteClientFailuresTest extends GridCommonAbstractTest { */ @Test public void testFailedClientLeavesTopologyAfterTimeout() throws Exception { + IgniteEx srv0 = (IgniteEx)startGridsMultiThreaded(3); + + IgniteEx client00 = startGrid("client00"); + IgniteEx client01 = startGrid("client01"); + + client00.getOrCreateCache(new CacheConfiguration<>("cache0")); + client01.getOrCreateCache(new CacheConfiguration<>("cache1")); + + IgniteInternalFuture f1 = GridTestUtils.runAsync(() -> breakClient(client00)); + IgniteInternalFuture f2 = GridTestUtils.runAsync(() -> breakClient(client01)); + + f1.get(); f2.get(); + + final IgniteClusterEx cl = srv0.cluster(); + + assertEquals(5, cl.topology(cl.topologyVersion()).size()); + + IgniteEx client02 = startGrid("client02"); + + assertEquals(6, cl.topology(cl.topologyVersion()).size()); + + boolean waitRes = GridTestUtils.waitForCondition(() -> (cl.topology(cl.topologyVersion()).size() == 4), + 20_000); + + assertTrue(waitRes); + + checkCacheOperations(client02.cache("cache0")); + + assertEquals(4, srv0.context().discovery().allNodes().size()); + + // Cluster metrics. + ClusterMetricsMXBean mxBeanCluster = GridCommonAbstractTest.getMxBean(srv0.name(), "Kernal", + ClusterMetricsMXBeanImpl.class.getSimpleName(), ClusterMetricsMXBean.class); + + assertEquals(1, mxBeanCluster.getTotalClientNodes()); + } + + /** + * Test verifies that when some sys thread (on server node) tries to re-establish connection to failed client + * and exchange-worker gets blocked waiting for it (e.g. to send partitions full map) + * it is not treated as {@link FailureType#SYSTEM_WORKER_BLOCKED} + * because this waiting is finite and part of normal operations. + * + * @throws Exception If failed. + */ + @Test + public void testExchangeWorkerIsNotTreatedAsBlockedWhenClientNodeFails() throws Exception { + GridStringLogger strLog = new GridStringLogger(false, new GridTestLog4jLogger()); + + strLog.logLength(1024 * 1024); + + inMemoryLog = strLog; + IgniteEx srv0 = startGrid(0); - IgniteEx client00 = startClientGrid("client00"); + inMemoryLog = null; - Thread.sleep(5_000); + IgniteEx client00 = startGrid("client00"); client00.getOrCreateCache(new CacheConfiguration<>("cache0")); + startGrid(1); + breakClient(client00); final IgniteClusterEx cl = srv0.cluster(); - assertEquals(2, cl.topology(cl.topologyVersion()).size()); - - IgniteEx client01 = startClientGrid("client01"); - assertEquals(3, cl.topology(cl.topologyVersion()).size()); - boolean waitRes = GridTestUtils.waitForCondition(() -> (cl.topology(cl.topologyVersion()).size() == 2), - 20_000); + startGrid("client01"); - checkCacheOperations(client01.cache("cache0")); + boolean waitRes = GridTestUtils.waitForCondition(() -> (cl.topology(cl.topologyVersion()).size() == 3), + 20_000); assertTrue(waitRes); + + String logRes = strLog.toString(); + + assertFalse(logRes.contains(EXCHANGE_WORKER_BLOCKED_MSG)); } /** */ diff --git a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java index bd0b9b7..4fc6b33 100644 --- a/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/spi/discovery/tcp/TcpClientDiscoverySpiFailureTimeoutSelfTest.java @@ -443,6 +443,7 @@ public class TcpClientDiscoverySpiFailureTimeoutSelfTest extends TcpClientDiscov /** */ private volatile long readDelay; + /** */ private volatile long writeToSocketDelay; /** */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java index 62da827..2fd14ef 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/index/IndexMetricsTest.java @@ -265,7 +265,11 @@ public class IndexMetricsTest extends AbstractIndexingCommonTest { * @param cls Cache metrics MXBean implementation. * @return Cache metrics MXBean. */ - private <T extends CacheMetricsMXBean> T cacheMetricsMXBean(IgniteEx n, String cacheName, Class<? super T> cls) { + private <T extends CacheMetricsMXBean> T cacheMetricsMXBean( + IgniteEx n, + String cacheName, + Class<? super T> cls + ) { requireNonNull(n); requireNonNull(cacheName); requireNonNull(cls);