This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 8113ec0  IGNITE-14375 Pending messages can be erroneously send (#8943)
8113ec0 is described below

commit 8113ec0c6edfea35e57cfe17333041ba34ccc108
Author: Evgeniy Stanilovskiy <stanilov...@gmail.com>
AuthorDate: Fri Apr 2 11:00:06 2021 +0300

    IGNITE-14375 Pending messages can be erroneously send (#8943)
---
 .../cache/CacheAffinitySharedManager.java          |  4 +-
 .../cache/DynamicCacheChangeRequest.java           |  2 +-
 .../internal/processors/cache/ExchangeActions.java |  4 +-
 .../cache/GatewayProtectedCacheProxy.java          |  2 +-
 .../processors/cache/GridCacheMapEntry.java        | 12 ++--
 .../cache/GridCachePartitionExchangeManager.java   |  2 +-
 .../processors/cache/GridCacheProcessor.java       |  2 +-
 .../cache/transactions/IgniteTxManager.java        |  3 +-
 .../ignite/spi/discovery/tcp/ServerImpl.java       | 65 +++++++++++-----------
 .../cache/CacheSerializableTransactionsTest.java   | 15 ++---
 .../junits/common/GridCommonAbstractTest.java      |  8 +--
 11 files changed, 58 insertions(+), 61 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index f0f4491..ff906be 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -639,12 +639,10 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
     /**
      * @param msg Change request.
      * @param topVer Current topology version.
-     * @param crd Coordinator flag.
      * @return Closed caches IDs.
      */
     private Set<Integer> processCacheCloseRequests(
         ClientCacheChangeDummyDiscoveryMessage msg,
-        boolean crd,
         AffinityTopologyVersion topVer
     ) {
         Set<String> cachesToClose = msg.cachesToClose();
@@ -706,7 +704,7 @@ public class CacheAffinitySharedManager<K, V> extends 
GridCacheSharedManagerAdap
 
         // Check and close caches via dummy message.
         if (msg.cachesToClose() != null)
-            closedCaches = processCacheCloseRequests(msg, crd, topVer);
+            closedCaches = processCacheCloseRequests(msg, topVer);
 
         // Shedule change message.
         if (startedCaches != null || closedCaches != null)
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
index 7f71c82..88d44cc 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java
@@ -502,7 +502,7 @@ public class DynamicCacheChangeRequest implements 
Serializable {
             ", clientStartOnly=" + clientStartOnly +
             ", stop=" + stop +
             ", destroy=" + destroy +
-            ", disabledAfterStart" + disabledAfterStart +
+            ", disabledAfterStart=" + disabledAfterStart +
             ']';
     }
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
index b31c6f0..cbe7df4 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java
@@ -93,14 +93,14 @@ public class ExchangeActions {
      * @return New caches start requests.
      */
     public Collection<CacheActionData> cacheStartRequests() {
-        return cachesToStart != null ? cachesToStart.values() : 
Collections.<CacheActionData>emptyList();
+        return cachesToStart != null ? cachesToStart.values() : 
Collections.emptyList();
     }
 
     /**
      * @return Stop cache requests.
      */
     public Collection<CacheActionData> cacheStopRequests() {
-        return cachesToStop != null ? cachesToStop.values() : 
Collections.<CacheActionData>emptyList();
+        return cachesToStop != null ? cachesToStop.values() : 
Collections.emptyList();
     }
 
     /**
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
index 1b9e610..4c361d9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java
@@ -1599,7 +1599,7 @@ public class GatewayProtectedCacheProxy<K, V> extends 
AsyncSupportAdapter<Ignite
             IgniteCacheProxyImpl proxyImpl = (IgniteCacheProxyImpl) delegate;
 
             try {
-                IgniteCacheProxy<K, V> proxy = 
context().kernalContext().cache().<K, V>publicJCache(context().name());
+                IgniteCacheProxy<K, V> proxy = 
context().kernalContext().cache().publicJCache(context().name());
 
                 if (proxy != null) {
                     proxyImpl.opportunisticRestart(proxy.internalProxy());
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
index b0221a7..2fde831 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java
@@ -1577,13 +1577,17 @@ public abstract class GridCacheMapEntry extends 
GridMetadataAwareAdapter impleme
 
             recordNodeId(affNodeId, topVer);
 
-            if (metrics && cctx.statisticsEnabled()) {
+            if (metrics && cctx.statisticsEnabled() && tx != null) {
                 cctx.cache().metrics0().onWrite();
 
-                T2<GridCacheOperation, CacheObject> entryProcRes = 
tx.entry(txKey()).entryProcessorCalculatedValue();
+                IgniteTxEntry txEntry = tx.entry(txKey());
 
-                if (entryProcRes != null && UPDATE.equals(entryProcRes.get1()))
-                    cctx.cache().metrics0().onInvokeUpdate(old != null);
+                if (txEntry != null) {
+                    T2<GridCacheOperation, CacheObject> entryProcRes = 
txEntry.entryProcessorCalculatedValue();
+
+                    if (entryProcRes != null && 
UPDATE.equals(entryProcRes.get1()))
+                        cctx.cache().metrics0().onInvokeUpdate(old != null);
+                }
             }
 
             if (evt && newVer != null && 
cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 5a80d8e..1f27819 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -1048,7 +1048,7 @@ public class GridCachePartitionExchangeManager<K, V> 
extends GridCacheSharedMana
 
         CacheGroupDescriptor grpDesc = 
cctx.affinity().cacheGroups().get(grpId);
 
-        assert grpDesc != null : grpId;
+        assert grpDesc != null : "grpId=" + grpId;
 
         CacheConfiguration<?, ?> ccfg = grpDesc.config();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
index faa98de..7c1d13d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java
@@ -287,7 +287,7 @@ public class GridCacheProcessor extends 
GridProcessorAdapter {
     /** Transaction interface implementation. */
     private IgniteTransactionsImpl transactions;
 
-    /** Pending cache starts. */
+    /** Pending cache operations. */
     private ConcurrentMap<UUID, IgniteInternalFuture> pendingFuts = new 
ConcurrentHashMap<>();
 
     /** Template configuration add futures. */
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
index fb3a892..53320e6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java
@@ -3202,7 +3202,8 @@ public class IgniteTxManager extends 
GridCacheSharedManagerAdapter {
                     for (Map.Entry<GridCacheMapEntry, Integer> info : 
store.entrySet()) {
                         GridCacheAdapter<Object, Object> cacheCtx = 
info.getKey().context().cache();
 
-                        metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new 
ArrayList<>()).add(info);
+                        if (cacheCtx != null)
+                            metricPerCacheStore.computeIfAbsent(cacheCtx, k -> 
new ArrayList<>()).add(info);
                     }
 
                     store.clear();
diff --git 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 2b0858c..0c1b185 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -2751,14 +2751,11 @@ class ServerImpl extends TcpDiscoveryImpl {
                     PendingMessage pm = new PendingMessage(msg);
 
                     this.msgs.add(pm);
-
-                    if (pm.customMsg && pm.id.equals(customDiscardId))
-                        this.customDiscardId = customDiscardId;
-
-                    if (!pm.customMsg && pm.id.equals(discardId))
-                        this.discardId = discardId;
                 }
             }
+
+            this.discardId = discardId;
+            this.customDiscardId = customDiscardId;
         }
 
         /**
@@ -6227,32 +6224,8 @@ class ServerImpl extends TcpDiscoveryImpl {
          */
         private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, 
boolean waitForNotification) {
             if (isLocalNodeCoordinator()) {
-                boolean delayMsg;
-
-                assert ring.minimumNodeVersion() != null : ring;
-
-                boolean joiningEmpty;
-
-                synchronized (mux) {
-                    joiningEmpty = joiningNodes.isEmpty();
-                }
-
-                delayMsg = msg.topologyVersion() == 0L && !joiningEmpty;
-
-                if (delayMsg) {
-                    if (log.isDebugEnabled()) {
-                        synchronized (mux) {
-                            log.debug("Delay custom message processing, there 
are joining nodes [msg=" + msg +
-                                ", joiningNodes=" + joiningNodes + ']');
-                        }
-                    }
-
-                    synchronized (mux) {
-                        pendingCustomMsgs.add(msg);
-                    }
-
+                if (posponeUndeliveredMessages(msg))
                     return;
-                }
 
                 if (!msg.verified()) {
                     msg.verify(getLocalNodeId());
@@ -6337,6 +6310,36 @@ class ServerImpl extends TcpDiscoveryImpl {
         }
 
         /**
+         * If new node is in the progress of being added we must store and 
resend undelivered messages.
+         *
+         * @param msg Processed message.
+         * @return {@code true} If message was appended to pending queue.
+         */
+        private boolean posponeUndeliveredMessages(final 
TcpDiscoveryCustomEventMessage msg) {
+            boolean joiningEmpty;
+
+            synchronized (mux) {
+                joiningEmpty = joiningNodes.isEmpty();
+
+                if (log.isDebugEnabled())
+                    log.debug("Delay custom message processing, there are 
joining nodes [msg=" + msg +
+                        ", joiningNodes=" + joiningNodes + ']');
+            }
+
+            boolean delayMsg = msg.topologyVersion() == 0L && !joiningEmpty;
+
+            if (delayMsg) {
+                synchronized (mux) {
+                    pendingCustomMsgs.add(msg);
+                }
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
          * Checks failed nodes list and sends {@link 
TcpDiscoveryNodeFailedMessage} if failed node is still in the
          * ring and node detected failure left ring.
          */
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
index 65859ca..07251ac 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java
@@ -74,7 +74,6 @@ import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionOptimisticException;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -186,7 +185,7 @@ public class CacheSerializableTransactionsTest extends 
GridCommonAbstractTest {
     private void txStreamerLoad(Ignite ignite,
         Integer key,
         String cacheName,
-        boolean allowOverwrite) throws Exception {
+        boolean allowOverwrite) {
         IgniteCache<Integer, Integer> cache = ignite.cache(cacheName);
 
         log.info("Test key: " + key);
@@ -2824,7 +2823,6 @@ public class CacheSerializableTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9226";)
     @Test
     public void testReadWriteTransactionsNoDeadlock() throws Exception {
         checkReadWriteTransactionsNoDeadlock(false);
@@ -2833,7 +2831,6 @@ public class CacheSerializableTransactionsTest extends 
GridCommonAbstractTest {
     /**
      * @throws Exception If failed.
      */
-    @Ignore("https://issues.apache.org/jira/browse/IGNITE-9226";)
     @Test
     public void testReadWriteTransactionsNoDeadlockMultinode() throws 
Exception {
         checkReadWriteTransactionsNoDeadlock(true);
@@ -2844,8 +2841,6 @@ public class CacheSerializableTransactionsTest extends 
GridCommonAbstractTest {
      * @throws Exception If failed.
      */
     private void checkReadWriteTransactionsNoDeadlock(final boolean multiNode) 
throws Exception {
-        fail("https://issues.apache.org/jira/browse/IGNITE-9226";);
-
         final Ignite ignite0 = ignite(0);
 
         for (final CacheConfiguration<Integer, Integer> ccfg : 
cacheConfigurations()) {
@@ -4140,7 +4135,7 @@ public class CacheSerializableTransactionsTest extends 
GridCommonAbstractTest {
 
             if (nonSer) {
                 nonSerFut = runMultiThreadedAsync(new Callable<Void>() {
-                    @Override public Void call() throws Exception {
+                    @Override public Void call() {
                         int nodeIdx = idx.getAndIncrement() % clients.size();
 
                         Ignite node = clients.get(nodeIdx);
@@ -4198,7 +4193,7 @@ public class CacheSerializableTransactionsTest extends 
GridCommonAbstractTest {
             }
 
             final IgniteInternalFuture<?> fut = runMultiThreadedAsync(new 
Callable<Void>() {
-                @Override public Void call() throws Exception {
+                @Override public Void call() {
                     int nodeIdx = idx.getAndIncrement() % clients.size();
 
                     Ignite node = clients.get(nodeIdx);
@@ -4210,8 +4205,8 @@ public class CacheSerializableTransactionsTest extends 
GridCommonAbstractTest {
                     final IgniteTransactions txs = node.transactions();
 
                     final IgniteCache<Integer, Account> cache =
-                        nearCache ? node.createNearCache(cacheName, new 
NearCacheConfiguration<Integer, Account>()) :
-                            node.<Integer, Account>cache(cacheName);
+                        nearCache ? node.createNearCache(cacheName, new 
NearCacheConfiguration<>()) :
+                            node.cache(cacheName);
 
                     assertNotNull(cache);
 
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
index 95aea5b..7075635 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java
@@ -1359,10 +1359,8 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
      * @param cnt Keys count.
      * @param startFrom Start value for keys search.
      * @return Collection of keys for which given cache is neither primary nor 
backup.
-     * @throws IgniteCheckedException If failed.
      */
-    protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int 
startFrom)
-        throws IgniteCheckedException {
+    protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int 
startFrom) {
         return findKeys(cache, cnt, startFrom, 2);
     }
 
@@ -1549,10 +1547,8 @@ public abstract class GridCommonAbstractTest extends 
GridAbstractTest {
     /**
      * @param cache Cache.
      * @return Key for which given cache is neither primary nor backup.
-     * @throws IgniteCheckedException If failed.
      */
-    protected Integer nearKey(IgniteCache<?, ?> cache)
-        throws IgniteCheckedException {
+    protected Integer nearKey(IgniteCache<?, ?> cache) {
         return nearKeys(cache, 1, 1).get(0);
     }
 

Reply via email to