This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 19eaf1b  IGNITE-13864 Fixed an issue where acknowledge on a stale 
latch could lead to assertion error. Fixes #8579
19eaf1b is described below

commit 19eaf1b3a9fc5f1832d1290547e8e112e9b713a8
Author: vd_pyatkov <vldpyat...@gmail.com>
AuthorDate: Tue Dec 22 23:47:01 2020 +0300

    IGNITE-13864 Fixed an issue where acknowledge on a stale latch could lead 
to assertion error. Fixes #8579
    
    Signed-off-by: Slava Koptilin <slava.kopti...@gmail.com>
---
 .../dht/preloader/latch/ExchangeLatchManager.java  |  25 +++--
 .../preloader/latch/ExchangeLatchManagerTest.java  | 106 +++++++++++++++++++++
 2 files changed, 121 insertions(+), 10 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
index 92af6ae..e09f8a0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManager.java
@@ -161,7 +161,7 @@ public class ExchangeLatchManager {
         serverLatches.put(latchUid, latch);
 
         if (log.isDebugEnabled())
-            log.debug("Server latch is created [latch=" + latchUid + ", 
participantsSize=" + participants.size() + "]");
+            log.debug("Server latch is created [latch=" + latchUid + ", 
participantsSize=" + participants.size() + ']');
 
         if (pendingAcks.containsKey(latchUid)) {
             Set<UUID> acks = pendingAcks.get(latchUid);
@@ -195,7 +195,7 @@ public class ExchangeLatchManager {
         if (log.isDebugEnabled())
             log.debug("Client latch is created [latch=" + latchUid
                 + ", crd=" + coordinator
-                + ", participantsSize=" + participants.size() + "]");
+                + ", participantsSize=" + participants.size() + ']');
 
         clientLatches.put(latchUid, latch);
 
@@ -392,9 +392,14 @@ public class ExchangeLatchManager {
 
             if (message.isFinal()) {
                 if (log.isDebugEnabled())
-                    log.debug("Process final ack [latch=" + latchUid + ", 
from=" + from + "]");
+                    log.debug("Process final ack [latch=" + latchUid + ", 
from=" + from + ']');
 
-                assert serverLatches.containsKey(latchUid) || 
clientLatches.containsKey(latchUid);
+                if (!serverLatches.containsKey(latchUid) && 
!clientLatches.containsKey(latchUid)) {
+                    log.warning("Latch for this acknowledge is completed or 
never existed " +
+                        "[latch=" + latchUid + ", from=" + from + ']');
+
+                    return;
+                }
 
                 if (clientLatches.containsKey(latchUid)) {
                     ClientLatch latch = clientLatches.get(latchUid);
@@ -404,7 +409,7 @@ public class ExchangeLatchManager {
             }
             else {
                 if (log.isDebugEnabled())
-                    log.debug("Process ack [latch=" + latchUid + ", from=" + 
from + "]");
+                    log.debug("Process ack [latch=" + latchUid + ", from=" + 
from + ']');
 
                 if (serverLatches.containsKey(latchUid)) {
                     ServerLatch latch = serverLatches.get(latchUid);
@@ -516,7 +521,7 @@ public class ExchangeLatchManager {
 
                 if (latch.hasParticipant(left.id()) && 
!latch.hasAck(left.id())) {
                     if (log.isDebugEnabled())
-                        log.debug("Process node left [latch=" + 
latchEntry.getKey() + ", left=" + left.id() + "]");
+                        log.debug("Process node left [latch=" + 
latchEntry.getKey() + ", left=" + left.id() + ']');
 
                     latch.ack(left.id());
                 }
@@ -553,7 +558,7 @@ public class ExchangeLatchManager {
                 );
 
                 if (log.isDebugEnabled())
-                    log.debug("Ack has sent [latch=" + latchUid + ", final=" + 
finalAck + ", to=" + nodeId + "]");
+                    log.debug("Ack has sent [latch=" + latchUid + ", final=" + 
finalAck + ", to=" + nodeId + ']');
             }
         }
         catch (IgniteCheckedException e) {
@@ -612,7 +617,7 @@ public class ExchangeLatchManager {
          */
         private void ack(UUID from) {
             if (log.isDebugEnabled())
-                log.debug("Ack is accepted [latch=" + latchId() + ", from=" + 
from + "]");
+                log.debug("Ack is accepted [latch=" + latchId() + ", from=" + 
from + ']');
 
             countDown0(from);
         }
@@ -631,7 +636,7 @@ public class ExchangeLatchManager {
             int remaining = permits.decrementAndGet();
 
             if (log.isDebugEnabled())
-                log.debug("Count down [latch=" + latchId() + ", remaining=" + 
remaining + "]");
+                log.debug("Count down [latch=" + latchId() + ", remaining=" + 
remaining + ']');
 
             if (remaining == 0) {
                 complete();
@@ -700,7 +705,7 @@ public class ExchangeLatchManager {
             synchronized (this) {
                 if (log.isDebugEnabled())
                     log.debug("Coordinator is changed [latch=" + latchId() + 
", newCrd=" + coordinator.id() +
-                        ", ackSent=" + ackSent + "]");
+                        ", ackSent=" + ackSent + ']');
 
                 this.coordinator = coordinator;
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java
index c3394e2..9105c27 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/latch/ExchangeLatchManagerTest.java
@@ -18,10 +18,20 @@
 package 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch;
 
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.managers.communication.GridIoPolicy;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.ListeningTestLogger;
+import org.apache.ignite.testframework.LogListener;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.junit.Test;
 
@@ -32,6 +42,102 @@ public class ExchangeLatchManagerTest extends 
GridCommonAbstractTest {
     /** */
     private static final String LATCH_NAME = "test";
 
+    /** Message are meaning that node getting a stale acknowledge message. */
+    private static final String STALE_ACK_LOG_MSG = "Latch for this 
acknowledge is completed or never existed";
+
+    /** Message happens when assertion was broken. */
+    public static final Pattern ERROR_MSG = Pattern.compile("An error occurred 
processing the message.*"
+        + LatchAckMessage.class.getSimpleName());
+
+    /** Grid logger. */
+    public ListeningTestLogger gridLogger;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        return super.getConfiguration(igniteInstanceName)
+            .setGridLogger(gridLogger)
+            .setCommunicationSpi(new TestRecordingCommunicationSpi());
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+    }
+
+    /**
+     * Checks reaction of latch on stale acknowledge from new coordinator.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testExcessAcknowledgeForNewCoordinator() throws Exception {
+        gridLogger = new ListeningTestLogger(log);
+
+        LogListener staleMessageLsnr = 
LogListener.matches(STALE_ACK_LOG_MSG).build();
+        LogListener errorLsnr = LogListener.matches(ERROR_MSG).build();
+
+        IgniteEx ignite0 = startGrids(3);
+
+        awaitPartitionMapExchange();
+
+        TestRecordingCommunicationSpi spi0 = 
TestRecordingCommunicationSpi.spi(ignite0);
+
+        spi0.blockMessages((node, msg) ->
+            msg instanceof LatchAckMessage && node.order() == 2);
+
+        spi0.record((node, msg) ->
+            msg instanceof LatchAckMessage && node.order() == 3);
+
+        Ignite ignite1 = G.allGrids().stream().filter(node -> 
node.cluster().localNode().order() == 2).findAny().get();
+
+        assertNotNull("Could not find node with second order.", ignite1);
+
+        TestRecordingCommunicationSpi spi1 = 
TestRecordingCommunicationSpi.spi(ignite1);
+
+        spi1.blockMessages((node, msg) -> {
+            if (msg instanceof LatchAckMessage && node.order() == 3) {
+                LatchAckMessage ack = (LatchAckMessage)msg;
+
+                return ack.topVer().topologyVersion() == 3;
+            }
+
+            return false;
+        });
+
+        IgniteInternalFuture exchangeDoingFut = GridTestUtils.runAsync(() ->
+            ignite0.createCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME))
+        );
+
+        spi0.waitForBlocked();
+        spi0.waitForRecorded();
+
+        ignite0.close();
+
+        spi1.waitForBlocked();
+
+        awaitPartitionMapExchange();
+
+        assertTrue(exchangeDoingFut.isDone());
+
+        gridLogger.registerAllListeners(errorLsnr, staleMessageLsnr);
+
+        spi1.stopBlock();
+
+        assertTrue(GridTestUtils.waitForCondition(() ->
+            staleMessageLsnr.check(), 10_000));
+
+        assertFalse(errorLsnr.check());
+    }
+
     /**
      * @throws Exception If failed.
      */

Reply via email to