This is an automated email from the ASF dual-hosted git repository. ibessonov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 8113ec0 IGNITE-14375 Pending messages can be erroneously send (#8943) 8113ec0 is described below commit 8113ec0c6edfea35e57cfe17333041ba34ccc108 Author: Evgeniy Stanilovskiy <stanilov...@gmail.com> AuthorDate: Fri Apr 2 11:00:06 2021 +0300 IGNITE-14375 Pending messages can be erroneously send (#8943) --- .../cache/CacheAffinitySharedManager.java | 4 +- .../cache/DynamicCacheChangeRequest.java | 2 +- .../internal/processors/cache/ExchangeActions.java | 4 +- .../cache/GatewayProtectedCacheProxy.java | 2 +- .../processors/cache/GridCacheMapEntry.java | 12 ++-- .../cache/GridCachePartitionExchangeManager.java | 2 +- .../processors/cache/GridCacheProcessor.java | 2 +- .../cache/transactions/IgniteTxManager.java | 3 +- .../ignite/spi/discovery/tcp/ServerImpl.java | 65 +++++++++++----------- .../cache/CacheSerializableTransactionsTest.java | 15 ++--- .../junits/common/GridCommonAbstractTest.java | 8 +-- 11 files changed, 58 insertions(+), 61 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java index f0f4491..ff906be 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java @@ -639,12 +639,10 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap /** * @param msg Change request. * @param topVer Current topology version. - * @param crd Coordinator flag. * @return Closed caches IDs. */ private Set<Integer> processCacheCloseRequests( ClientCacheChangeDummyDiscoveryMessage msg, - boolean crd, AffinityTopologyVersion topVer ) { Set<String> cachesToClose = msg.cachesToClose(); @@ -706,7 +704,7 @@ public class CacheAffinitySharedManager<K, V> extends GridCacheSharedManagerAdap // Check and close caches via dummy message. if (msg.cachesToClose() != null) - closedCaches = processCacheCloseRequests(msg, crd, topVer); + closedCaches = processCacheCloseRequests(msg, topVer); // Shedule change message. if (startedCaches != null || closedCaches != null) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java index 7f71c82..88d44cc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/DynamicCacheChangeRequest.java @@ -502,7 +502,7 @@ public class DynamicCacheChangeRequest implements Serializable { ", clientStartOnly=" + clientStartOnly + ", stop=" + stop + ", destroy=" + destroy + - ", disabledAfterStart" + disabledAfterStart + + ", disabledAfterStart=" + disabledAfterStart + ']'; } } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java index b31c6f0..cbe7df4 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/ExchangeActions.java @@ -93,14 +93,14 @@ public class ExchangeActions { * @return New caches start requests. */ public Collection<CacheActionData> cacheStartRequests() { - return cachesToStart != null ? cachesToStart.values() : Collections.<CacheActionData>emptyList(); + return cachesToStart != null ? cachesToStart.values() : Collections.emptyList(); } /** * @return Stop cache requests. */ public Collection<CacheActionData> cacheStopRequests() { - return cachesToStop != null ? cachesToStop.values() : Collections.<CacheActionData>emptyList(); + return cachesToStop != null ? cachesToStop.values() : Collections.emptyList(); } /** diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java index 1b9e610..4c361d9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GatewayProtectedCacheProxy.java @@ -1599,7 +1599,7 @@ public class GatewayProtectedCacheProxy<K, V> extends AsyncSupportAdapter<Ignite IgniteCacheProxyImpl proxyImpl = (IgniteCacheProxyImpl) delegate; try { - IgniteCacheProxy<K, V> proxy = context().kernalContext().cache().<K, V>publicJCache(context().name()); + IgniteCacheProxy<K, V> proxy = context().kernalContext().cache().publicJCache(context().name()); if (proxy != null) { proxyImpl.opportunisticRestart(proxy.internalProxy()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index b0221a7..2fde831 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -1577,13 +1577,17 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme recordNodeId(affNodeId, topVer); - if (metrics && cctx.statisticsEnabled()) { + if (metrics && cctx.statisticsEnabled() && tx != null) { cctx.cache().metrics0().onWrite(); - T2<GridCacheOperation, CacheObject> entryProcRes = tx.entry(txKey()).entryProcessorCalculatedValue(); + IgniteTxEntry txEntry = tx.entry(txKey()); - if (entryProcRes != null && UPDATE.equals(entryProcRes.get1())) - cctx.cache().metrics0().onInvokeUpdate(old != null); + if (txEntry != null) { + T2<GridCacheOperation, CacheObject> entryProcRes = txEntry.entryProcessorCalculatedValue(); + + if (entryProcRes != null && UPDATE.equals(entryProcRes.get1())) + cctx.cache().metrics0().onInvokeUpdate(old != null); + } } if (evt && newVer != null && cctx.events().isRecordable(EVT_CACHE_OBJECT_PUT)) { diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java index 5a80d8e..1f27819 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java @@ -1048,7 +1048,7 @@ public class GridCachePartitionExchangeManager<K, V> extends GridCacheSharedMana CacheGroupDescriptor grpDesc = cctx.affinity().cacheGroups().get(grpId); - assert grpDesc != null : grpId; + assert grpDesc != null : "grpId=" + grpId; CacheConfiguration<?, ?> ccfg = grpDesc.config(); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java index faa98de..7c1d13d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheProcessor.java @@ -287,7 +287,7 @@ public class GridCacheProcessor extends GridProcessorAdapter { /** Transaction interface implementation. */ private IgniteTransactionsImpl transactions; - /** Pending cache starts. */ + /** Pending cache operations. */ private ConcurrentMap<UUID, IgniteInternalFuture> pendingFuts = new ConcurrentHashMap<>(); /** Template configuration add futures. */ diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index fb3a892..53320e6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -3202,7 +3202,8 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { for (Map.Entry<GridCacheMapEntry, Integer> info : store.entrySet()) { GridCacheAdapter<Object, Object> cacheCtx = info.getKey().context().cache(); - metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(info); + if (cacheCtx != null) + metricPerCacheStore.computeIfAbsent(cacheCtx, k -> new ArrayList<>()).add(info); } store.clear(); diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java index 2b0858c..0c1b185 100644 --- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java @@ -2751,14 +2751,11 @@ class ServerImpl extends TcpDiscoveryImpl { PendingMessage pm = new PendingMessage(msg); this.msgs.add(pm); - - if (pm.customMsg && pm.id.equals(customDiscardId)) - this.customDiscardId = customDiscardId; - - if (!pm.customMsg && pm.id.equals(discardId)) - this.discardId = discardId; } } + + this.discardId = discardId; + this.customDiscardId = customDiscardId; } /** @@ -6227,32 +6224,8 @@ class ServerImpl extends TcpDiscoveryImpl { */ private void processCustomMessage(TcpDiscoveryCustomEventMessage msg, boolean waitForNotification) { if (isLocalNodeCoordinator()) { - boolean delayMsg; - - assert ring.minimumNodeVersion() != null : ring; - - boolean joiningEmpty; - - synchronized (mux) { - joiningEmpty = joiningNodes.isEmpty(); - } - - delayMsg = msg.topologyVersion() == 0L && !joiningEmpty; - - if (delayMsg) { - if (log.isDebugEnabled()) { - synchronized (mux) { - log.debug("Delay custom message processing, there are joining nodes [msg=" + msg + - ", joiningNodes=" + joiningNodes + ']'); - } - } - - synchronized (mux) { - pendingCustomMsgs.add(msg); - } - + if (posponeUndeliveredMessages(msg)) return; - } if (!msg.verified()) { msg.verify(getLocalNodeId()); @@ -6337,6 +6310,36 @@ class ServerImpl extends TcpDiscoveryImpl { } /** + * If new node is in the progress of being added we must store and resend undelivered messages. + * + * @param msg Processed message. + * @return {@code true} If message was appended to pending queue. + */ + private boolean posponeUndeliveredMessages(final TcpDiscoveryCustomEventMessage msg) { + boolean joiningEmpty; + + synchronized (mux) { + joiningEmpty = joiningNodes.isEmpty(); + + if (log.isDebugEnabled()) + log.debug("Delay custom message processing, there are joining nodes [msg=" + msg + + ", joiningNodes=" + joiningNodes + ']'); + } + + boolean delayMsg = msg.topologyVersion() == 0L && !joiningEmpty; + + if (delayMsg) { + synchronized (mux) { + pendingCustomMsgs.add(msg); + } + + return true; + } + + return false; + } + + /** * Checks failed nodes list and sends {@link TcpDiscoveryNodeFailedMessage} if failed node is still in the * ring and node detected failure left ring. */ diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java index 65859ca..07251ac 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheSerializableTransactionsTest.java @@ -74,7 +74,6 @@ import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionOptimisticException; -import org.junit.Ignore; import org.junit.Test; import static java.util.concurrent.TimeUnit.SECONDS; @@ -186,7 +185,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { private void txStreamerLoad(Ignite ignite, Integer key, String cacheName, - boolean allowOverwrite) throws Exception { + boolean allowOverwrite) { IgniteCache<Integer, Integer> cache = ignite.cache(cacheName); log.info("Test key: " + key); @@ -2824,7 +2823,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - @Ignore("https://issues.apache.org/jira/browse/IGNITE-9226") @Test public void testReadWriteTransactionsNoDeadlock() throws Exception { checkReadWriteTransactionsNoDeadlock(false); @@ -2833,7 +2831,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { /** * @throws Exception If failed. */ - @Ignore("https://issues.apache.org/jira/browse/IGNITE-9226") @Test public void testReadWriteTransactionsNoDeadlockMultinode() throws Exception { checkReadWriteTransactionsNoDeadlock(true); @@ -2844,8 +2841,6 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { * @throws Exception If failed. */ private void checkReadWriteTransactionsNoDeadlock(final boolean multiNode) throws Exception { - fail("https://issues.apache.org/jira/browse/IGNITE-9226"); - final Ignite ignite0 = ignite(0); for (final CacheConfiguration<Integer, Integer> ccfg : cacheConfigurations()) { @@ -4140,7 +4135,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { if (nonSer) { nonSerFut = runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { + @Override public Void call() { int nodeIdx = idx.getAndIncrement() % clients.size(); Ignite node = clients.get(nodeIdx); @@ -4198,7 +4193,7 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { } final IgniteInternalFuture<?> fut = runMultiThreadedAsync(new Callable<Void>() { - @Override public Void call() throws Exception { + @Override public Void call() { int nodeIdx = idx.getAndIncrement() % clients.size(); Ignite node = clients.get(nodeIdx); @@ -4210,8 +4205,8 @@ public class CacheSerializableTransactionsTest extends GridCommonAbstractTest { final IgniteTransactions txs = node.transactions(); final IgniteCache<Integer, Account> cache = - nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<Integer, Account>()) : - node.<Integer, Account>cache(cacheName); + nearCache ? node.createNearCache(cacheName, new NearCacheConfiguration<>()) : + node.cache(cacheName); assertNotNull(cache); diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java index 95aea5b..7075635 100755 --- a/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/testframework/junits/common/GridCommonAbstractTest.java @@ -1359,10 +1359,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { * @param cnt Keys count. * @param startFrom Start value for keys search. * @return Collection of keys for which given cache is neither primary nor backup. - * @throws IgniteCheckedException If failed. */ - protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int startFrom) - throws IgniteCheckedException { + protected List<Integer> nearKeys(IgniteCache<?, ?> cache, int cnt, int startFrom) { return findKeys(cache, cnt, startFrom, 2); } @@ -1549,10 +1547,8 @@ public abstract class GridCommonAbstractTest extends GridAbstractTest { /** * @param cache Cache. * @return Key for which given cache is neither primary nor backup. - * @throws IgniteCheckedException If failed. */ - protected Integer nearKey(IgniteCache<?, ?> cache) - throws IgniteCheckedException { + protected Integer nearKey(IgniteCache<?, ?> cache) { return nearKeys(cache, 1, 1).get(0); }