This is an automated email from the ASF dual-hosted git repository.
mpetrov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 8eb82c4b343 IGNITE-27109 Fixed missing error for partially successful
putAll on ATOMIC, FULL_SYNC caches during primary node stop (#12656)
8eb82c4b343 is described below
commit 8eb82c4b343da73659d515e6052ff355e43907a6
Author: Mikhail Petrov <[email protected]>
AuthorDate: Tue Jan 27 20:05:37 2026 +0300
IGNITE-27109 Fixed missing error for partially successful putAll on ATOMIC,
FULL_SYNC caches during primary node stop (#12656)
---
.../atomic/GridDhtAtomicAbstractUpdateFuture.java | 11 +-
...iteCacheAtomicFullSyncPartialUpdateAllTest.java | 245 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite13.java | 3 +
3 files changed, 256 insertions(+), 3 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
index 9831dc13598..612d01dc178 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicAbstractUpdateFuture.java
@@ -377,10 +377,13 @@ public abstract class GridDhtAtomicAbstractUpdateFuture
extends GridCacheFutureA
return;
}
+ boolean hasPrimaryUpdErrors = updateRes.errors() != null;
+
boolean needReplyToNear = updateReq.writeSynchronizationMode() ==
PRIMARY_SYNC ||
!ret.emptyResult() ||
updateReq.nearCache() ||
- cctx.localNodeId().equals(nearNode.id());
+ cctx.localNodeId().equals(nearNode.id()) ||
+ hasPrimaryUpdErrors;
boolean needMapping = updateReq.fullSync() &&
(updateReq.needPrimaryResponse() || !sendAllToDht());
@@ -402,8 +405,10 @@ public abstract class GridDhtAtomicAbstractUpdateFuture
extends GridCacheFutureA
needReplyToNear = true;
}
- // If there are readers updates then nearNode should not finish before
primary response received.
- sendDhtRequests(nearNode, ret, !readersOnlyNodes);
+ // "Near" node should not finish until it receives a response from
primary node in the following cases:
+ // - only "near cache" is updated
+ // - primary failed to process any keys
+ sendDhtRequests(nearNode, ret, !(readersOnlyNodes ||
hasPrimaryUpdErrors));
if (needReplyToNear)
completionCb.apply(updateReq, updateRes);
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicFullSyncPartialUpdateAllTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicFullSyncPartialUpdateAllTest.java
new file mode 100644
index 00000000000..d690304d0bb
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/IgniteCacheAtomicFullSyncPartialUpdateAllTest.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.Cache;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheInterceptor;
+import org.apache.ignite.cache.CachePartialUpdateException;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import
org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
+import
org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+
+import static java.util.Collections.singletonMap;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
+import static org.apache.ignite.internal.TestRecordingCommunicationSpi.spi;
+import static
org.apache.ignite.internal.processors.cache.GridCacheUtils.cacheId;
+import static
org.apache.ignite.internal.processors.cache.distributed.GridCacheModuloAffinityFunction.IDX_ATTR;
+
+/** */
+public class IgniteCacheAtomicFullSyncPartialUpdateAllTest extends
GridCommonAbstractTest {
+ /** */
+ public static final int NODE_1_FIRST_KEY = 1;
+
+ /** */
+ public static final int NODE_1_SECOND_KEY = 4;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ return super.getConfiguration(igniteInstanceName)
+ .setCommunicationSpi(new TestRecordingCommunicationSpi())
+ .setUserAttributes(singletonMap(IDX_ATTR,
getTestIgniteInstanceIndex(igniteInstanceName)));
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * In the following scenario primary node fails to send {@link
GridNearAtomicUpdateResponse) to `near node` during
+ * shutdown. As a result entries, belonging to the left primary node, are
remmapped to new topology version by the `near node`.
+ */
+ @Test
+ public void testCacheEntriesProcessingFailureCausedByNodeStop() throws
Exception {
+ startGridsMultiThreaded(3);
+
+ TestInterceptor.putStartedLatch = new CountDownLatch(1);
+ TestInterceptor.putUnblockedLatch = new CountDownLatch(1);
+
+ IgniteCache<Integer, Integer> cache =
grid(0).createCache(createTestCacheConfiguration(false));
+
+ IgniteInternalFuture<Object> putFut = GridTestUtils.runAsync(() ->
cache.putAll(createTestData()));
+
+ assertTrue(TestInterceptor.putStartedLatch.await(getTestTimeout(),
MILLISECONDS));
+
+ IgniteEx stoppingNode = grid(1);
+
+ IgniteInternalFuture<Object> stopFut = GridTestUtils.runAsync(() ->
stopGrid(1));
+
+ try {
+ assertTrue(GridTestUtils.waitForCondition(() ->
+
U.<AtomicBoolean>field(stoppingNode.context().cache().cacheGroup(cacheId(DEFAULT_CACHE_NAME)).offheap(),
"stopping").get(),
+ getTestTimeout()
+ ));
+
+ TestInterceptor.putUnblockedLatch.countDown();
+
+ putFut.get(getTestTimeout(), MILLISECONDS);
+
+ assertNotNull(cache.get(0));
+ assertNotNull(cache.get(NODE_1_FIRST_KEY));
+ assertNotNull(cache.get(NODE_1_SECOND_KEY));
+ }
+ catch (CachePartialUpdateException e) {
+ assertTrue(e.getMessage().contains("Failed to update keys (retry
update if possible)"));
+ }
+ finally {
+ stopFut.get(getTestTimeout(), MILLISECONDS);
+ }
+ }
+
+ /**
+ * In the following scenario `near node` does not complete putAll until
{@link GridNearAtomicUpdateResponse),
+ * contatining cache entry processing errors from the primary node, is
received. Even when `near node` receives all
+ * responces from backup nodes.
+ */
+ @Test
+ public void
testCacheEntriesProcessingFailureCausedByInterceptorException() throws
Exception {
+ startGridsMultiThreaded(3);
+
+ IgniteCache<Integer, Integer> cache =
grid(0).createCache(createTestCacheConfiguration(true));
+
+ CountDownLatch backupResponsesReceivedLatch = new CountDownLatch(3);
+
+ grid(0).context().io().addMessageListener(TOPIC_CACHE, (n, m, p) -> {
+ if (m instanceof GridDhtAtomicNearResponse)
+ backupResponsesReceivedLatch.countDown();
+ });
+
+ spi(grid(1)).blockMessages((n, m) -> m instanceof
GridNearAtomicUpdateResponse);
+
+ IgniteInternalFuture<Object> putFut = GridTestUtils.runAsync(() ->
cache.putAll(createTestData()));
+
+ spi(grid(1)).waitForBlocked();
+
+ assertTrue(backupResponsesReceivedLatch.await(getTestTimeout(),
MILLISECONDS));
+
+ spi(grid(1)).stopBlock();
+
+ GridTestUtils.assertThrowsAnyCause(
+ log,
+ () -> {
+ putFut.get(getTestTimeout(), MILLISECONDS);
+
+ return null;
+ },
+ CachePartialUpdateCheckedException.class,
+ "Failed to update keys (retry update if possible).: [" +
NODE_1_SECOND_KEY + ']'
+ );
+ }
+
+ /** */
+ private CacheConfiguration<Integer, Integer>
createTestCacheConfiguration(boolean forceCacheEntryProcessingError) {
+ return new CacheConfiguration<Integer, Integer>()
+ .setName(DEFAULT_CACHE_NAME)
+ .setAtomicityMode(ATOMIC)
+ .setWriteSynchronizationMode(FULL_SYNC)
+ .setBackups(2)
+ .setAffinity(new GridCacheModuloAffinityFunction(3, 2))
+ .setInterceptor(new
TestInterceptor(forceCacheEntryProcessingError));
+ }
+
+ /** */
+ private Map<Integer, Integer> createTestData() {
+ Map<Integer, Integer> data = new TreeMap<>();
+
+ data.put(0, 0); // node0 entry
+ data.put(NODE_1_FIRST_KEY, 1); // node1 entry
+ data.put(NODE_1_SECOND_KEY, 4); // node1 entry
+
+ return data;
+ }
+
+ /** */
+ public static final class TestInterceptor implements
CacheInterceptor<Integer, Integer> {
+ /** */
+ public static CountDownLatch putStartedLatch;
+
+ /** */
+ public static CountDownLatch putUnblockedLatch;
+
+ /** */
+ private final boolean forceCacheEntryProcessingError;
+
+ /** */
+ @IgniteInstanceResource
+ private IgniteEx ignite;
+
+ /** */
+ public TestInterceptor(boolean forceCacheEntryProcessingError) {
+ this.forceCacheEntryProcessingError =
forceCacheEntryProcessingError;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Integer onGet(Integer key, @Nullable
Integer val) {
+ return val;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Integer onBeforePut(Cache.Entry<Integer,
Integer> entry, Integer newVal) {
+ if (ignite.localNode().<Integer>attribute(IDX_ATTR) != 1)
+ return newVal;
+
+ if (forceCacheEntryProcessingError) {
+ if (entry.getKey() == NODE_1_SECOND_KEY)
+ throw new RuntimeException("expected");
+ }
+ else if (entry.getKey() == NODE_1_FIRST_KEY) {
+ putStartedLatch.countDown();
+
+ try {
+ assertTrue(putUnblockedLatch.await(5000, MILLISECONDS));
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+
+ throw new IgniteException(e);
+ }
+ }
+
+ return newVal;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAfterPut(Cache.Entry<Integer, Integer> entry) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable IgniteBiTuple<Boolean, Integer>
onBeforeRemove(Cache.Entry<Integer, Integer> entry) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAfterRemove(Cache.Entry<Integer, Integer>
entry) {
+ // No-op.
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
index c7c4200a009..a2a17c5d2b1 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite13.java
@@ -47,6 +47,7 @@ import
org.apache.ignite.internal.processors.cache.GridCacheIoManagerRetryTest;
import
org.apache.ignite.internal.processors.cache.GridCacheLongRunningTransactionDiagnosticsTest;
import
org.apache.ignite.internal.processors.cache.GridCacheVersionGenerationWithCacheStorageTest;
import
org.apache.ignite.internal.processors.cache.distributed.FailBackupOnAtomicOperationTest;
+import
org.apache.ignite.internal.processors.cache.distributed.dht.atomic.IgniteCacheAtomicFullSyncPartialUpdateAllTest;
import
org.apache.ignite.internal.processors.cache.distributed.rebalancing.RebalanceStatisticsTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxAsyncOpsSemaphorePermitsExceededTest;
import
org.apache.ignite.internal.processors.cache.transactions.TxRecoveryOnCoordniatorFailTest;
@@ -127,6 +128,8 @@ public class IgniteCacheTestSuite13 {
GridTestUtils.addTestIfNeeded(suite,
GridCacheIoManagerRetryTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
IgniteCacheAtomicFullSyncPartialUpdateAllTest.class, ignoredTests);
+
return suite;
}
}