This is an automated email from the ASF dual-hosted git repository. sk0x50 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 19eaf1b IGNITE-13864 Fixed an issue where acknowledge on a stale latch could lead to assertion error. Fixes #8579 19eaf1b is described below commit 19eaf1b3a9fc5f1832d1290547e8e112e9b713a8 Author: vd_pyatkov <vldpyat...@gmail.com> AuthorDate: Tue Dec 22 23:47:01 2020 +0300 IGNITE-13864 Fixed an issue where acknowledge on a stale latch could lead to assertion error. Fixes #8579 Signed-off-by: Slava Koptilin <slava.kopti...@gmail.com> --- .../dht/preloader/latch/ExchangeLatchManager.java | 25 +++-- .../preloader/latch/ExchangeLatchManagerTest.java | 106 +++++++++++++++++++++ 2 files changed, 121 insertions(+), 10 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java index 92af6ae..e09f8a0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java @@ -161,7 +161,7 @@ public class ExchangeLatchManager { serverLatches.put(latchUid, latch); if (log.isDebugEnabled()) - log.debug("Server latch is created [latch=" + latchUid + ", participantsSize=" + participants.size() + "]"); + log.debug("Server latch is created [latch=" + latchUid + ", participantsSize=" + participants.size() + ']'); if (pendingAcks.containsKey(latchUid)) { Set<UUID> acks = pendingAcks.get(latchUid); @@ -195,7 +195,7 @@ public class ExchangeLatchManager { if (log.isDebugEnabled()) log.debug("Client latch is created [latch=" + latchUid + ", crd=" + coordinator - + ", participantsSize=" + participants.size() + "]"); + + ", participantsSize=" + participants.size() + ']'); clientLatches.put(latchUid, latch); @@ -392,9 +392,14 @@ public class ExchangeLatchManager { if (message.isFinal()) { if (log.isDebugEnabled()) - log.debug("Process final ack [latch=" + latchUid + ", from=" + from + "]"); + log.debug("Process final ack [latch=" + latchUid + ", from=" + from + ']'); - assert serverLatches.containsKey(latchUid) || clientLatches.containsKey(latchUid); + if (!serverLatches.containsKey(latchUid) && !clientLatches.containsKey(latchUid)) { + log.warning("Latch for this acknowledge is completed or never existed " + + "[latch=" + latchUid + ", from=" + from + ']'); + + return; + } if (clientLatches.containsKey(latchUid)) { ClientLatch latch = clientLatches.get(latchUid); @@ -404,7 +409,7 @@ public class ExchangeLatchManager { } else { if (log.isDebugEnabled()) - log.debug("Process ack [latch=" + latchUid + ", from=" + from + "]"); + log.debug("Process ack [latch=" + latchUid + ", from=" + from + ']'); if (serverLatches.containsKey(latchUid)) { ServerLatch latch = serverLatches.get(latchUid); @@ -516,7 +521,7 @@ public class ExchangeLatchManager { if (latch.hasParticipant(left.id()) && !latch.hasAck(left.id())) { if (log.isDebugEnabled()) - log.debug("Process node left [latch=" + latchEntry.getKey() + ", left=" + left.id() + "]"); + log.debug("Process node left [latch=" + latchEntry.getKey() + ", left=" + left.id() + ']'); latch.ack(left.id()); } @@ -553,7 +558,7 @@ public class ExchangeLatchManager { ); if (log.isDebugEnabled()) - log.debug("Ack has sent [latch=" + latchUid + ", final=" + finalAck + ", to=" + nodeId + "]"); + log.debug("Ack has sent [latch=" + latchUid + ", final=" + finalAck + ", to=" + nodeId + ']'); } } catch (IgniteCheckedException e) { @@ -612,7 +617,7 @@ public class ExchangeLatchManager { */ private void ack(UUID from) { if (log.isDebugEnabled()) - log.debug("Ack is accepted [latch=" + latchId() + ", from=" + from + "]"); + log.debug("Ack is accepted [latch=" + latchId() + ", from=" + from + ']'); countDown0(from); } @@ -631,7 +636,7 @@ public class ExchangeLatchManager { int remaining = permits.decrementAndGet(); if (log.isDebugEnabled()) - log.debug("Count down [latch=" + latchId() + ", remaining=" + remaining + "]"); + log.debug("Count down [latch=" + latchId() + ", remaining=" + remaining + ']'); if (remaining == 0) { complete(); @@ -700,7 +705,7 @@ public class ExchangeLatchManager { synchronized (this) { if (log.isDebugEnabled()) log.debug("Coordinator is changed [latch=" + latchId() + ", newCrd=" + coordinator.id() + - ", ackSent=" + ackSent + "]"); + ", ackSent=" + ackSent + ']'); this.coordinator = coordinator; diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java index c3394e2..9105c27 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java @@ -18,10 +18,20 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; +import org.apache.ignite.Ignite; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.internal.GridTopic; import org.apache.ignite.internal.IgniteEx; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.managers.communication.GridIoPolicy; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.ListeningTestLogger; +import org.apache.ignite.testframework.LogListener; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.junit.Test; @@ -32,6 +42,102 @@ public class ExchangeLatchManagerTest extends GridCommonAbstractTest { /** */ private static final String LATCH_NAME = "test"; + /** Message are meaning that node getting a stale acknowledge message. */ + private static final String STALE_ACK_LOG_MSG = "Latch for this acknowledge is completed or never existed"; + + /** Message happens when assertion was broken. */ + public static final Pattern ERROR_MSG = Pattern.compile("An error occurred processing the message.*" + + LatchAckMessage.class.getSimpleName()); + + /** Grid logger. */ + public ListeningTestLogger gridLogger; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { + return super.getConfiguration(igniteInstanceName) + .setGridLogger(gridLogger) + .setCommunicationSpi(new TestRecordingCommunicationSpi()); + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + super.beforeTest(); + + stopAllGrids(); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * Checks reaction of latch on stale acknowledge from new coordinator. + * + * @throws Exception If failed. + */ + @Test + public void testExcessAcknowledgeForNewCoordinator() throws Exception { + gridLogger = new ListeningTestLogger(log); + + LogListener staleMessageLsnr = LogListener.matches(STALE_ACK_LOG_MSG).build(); + LogListener errorLsnr = LogListener.matches(ERROR_MSG).build(); + + IgniteEx ignite0 = startGrids(3); + + awaitPartitionMapExchange(); + + TestRecordingCommunicationSpi spi0 = TestRecordingCommunicationSpi.spi(ignite0); + + spi0.blockMessages((node, msg) -> + msg instanceof LatchAckMessage && node.order() == 2); + + spi0.record((node, msg) -> + msg instanceof LatchAckMessage && node.order() == 3); + + Ignite ignite1 = G.allGrids().stream().filter(node -> node.cluster().localNode().order() == 2).findAny().get(); + + assertNotNull("Could not find node with second order.", ignite1); + + TestRecordingCommunicationSpi spi1 = TestRecordingCommunicationSpi.spi(ignite1); + + spi1.blockMessages((node, msg) -> { + if (msg instanceof LatchAckMessage && node.order() == 3) { + LatchAckMessage ack = (LatchAckMessage)msg; + + return ack.topVer().topologyVersion() == 3; + } + + return false; + }); + + IgniteInternalFuture exchangeDoingFut = GridTestUtils.runAsync(() -> + ignite0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME)) + ); + + spi0.waitForBlocked(); + spi0.waitForRecorded(); + + ignite0.close(); + + spi1.waitForBlocked(); + + awaitPartitionMapExchange(); + + assertTrue(exchangeDoingFut.isDone()); + + gridLogger.registerAllListeners(errorLsnr, staleMessageLsnr); + + spi1.stopBlock(); + + assertTrue(GridTestUtils.waitForCondition(() -> + staleMessageLsnr.check(), 10_000)); + + assertFalse(errorLsnr.check()); + } + /** * @throws Exception If failed. */