This is an automated email from the ASF dual-hosted git repository.
sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new b73d2609daf IGNITE-25641 Fix discrepancy with CacheStore in case of an
error during tx commit phase (#12178)
b73d2609daf is described below
commit b73d2609daf6b906b55fc5bf45e3b30ea202cf1d
Author: Sergey Chugunov <[email protected]>
AuthorDate: Thu Aug 7 17:27:29 2025 +0400
IGNITE-25641 Fix discrepancy with CacheStore in case of an error during tx
commit phase (#12178)
---
.../cache/transactions/IgniteTxAdapter.java | 6 +-
.../store/CacheStoreWithIgniteTxFailureTest.java | 315 +++++++++++++++++++++
.../ignite/testsuites/IgniteCacheTestSuite4.java | 2 +
3 files changed, 322 insertions(+), 1 deletion(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
index ce8c022e85c..193eb2a4e8a 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxAdapter.java
@@ -512,8 +512,12 @@ public abstract class IgniteTxAdapter extends
GridMetadataAwareAdapter implement
try {
GridCacheEntryEx entry = e.cached();
- if (e.op() != NOOP)
+ if (e.op() != NOOP) {
entry.invalidate(xidVer);
+
+ if (e.context().readThrough())
+ entry.clear(xidVer, true);
+ }
}
catch (Throwable t) {
U.error(log, "Failed to invalidate transaction entries while
reverting a commit.", t);
diff --git
a/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWithIgniteTxFailureTest.java
b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWithIgniteTxFailureTest.java
new file mode 100644
index 00000000000..6f8495d8042
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/cache/store/CacheStoreWithIgniteTxFailureTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.cache.store;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.IntFunction;
+import javax.cache.Cache;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.CacheInterceptor;
+import org.apache.ignite.cache.CacheInterceptorAdapter;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.failure.FailureHandler;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import org.apache.ignite.internal.processors.cache.GridCacheAbstractSelfTest;
+import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.transactions.Transaction;
+import org.jetbrains.annotations.Nullable;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests to check scenarios with system failures during transaction commit.
Internal system failures are simulated by
+ * {@link CacheInterceptor} custom implementation throwing an exception during
final commit phase.
+ */
+@RunWith(Parameterized.class)
+public class CacheStoreWithIgniteTxFailureTest extends
GridCacheAbstractSelfTest {
+ /** */
+ private static final int GRID_COUNT = 3;
+
+ /** */
+ private static final int KEYS_NUMBER = 50;
+
+ /** */
+ private static final int FAULTY_NODE_IDX = 1;
+
+ /** */
+ private static final IntFunction<Integer> KEY_UPDATE_FUNCTION = key -> key
+ KEYS_NUMBER * 3;
+
+ /**
+ * Type of node for keys involved into transaction: primary or backup.
+ */
+ private enum FaultyNodeType {
+ /** */
+ PRIMARY,
+ /** */
+ BACKUP
+ }
+
+ /**
+ * Role of faulty node in transaction management: tx coordinator or
regular node.
+ */
+ private enum FaultyNodeRole {
+ /** */
+ REGULAR,
+ /** */
+ TX_COORDINATOR
+ }
+
+ /** */
+ @Parameterized.Parameter
+ public FaultyNodeType faultyNodeType;
+
+ /** */
+ @Parameterized.Parameter(1)
+ public FaultyNodeRole faultyNodeRole;
+
+ /** */
+ @Parameterized.Parameter(2)
+ public boolean withFaulireHnd;
+
+ /** */
+ @Parameterized.Parameters(name = "faultyNodeType={0}, faultyNodeRole={1},
withFaulireHandler={2}")
+ public static List<Object[]> parameters() {
+ List<Object[]> params = new ArrayList<>();
+
+ params.add(new Object[] {FaultyNodeType.PRIMARY,
FaultyNodeRole.REGULAR, true});
+ params.add(new Object[] {FaultyNodeType.PRIMARY,
FaultyNodeRole.REGULAR, false});
+ params.add(new Object[] {FaultyNodeType.BACKUP,
FaultyNodeRole.REGULAR, true});
+ params.add(new Object[] {FaultyNodeType.BACKUP,
FaultyNodeRole.REGULAR, false});
+
+ params.add(new Object[] {FaultyNodeType.PRIMARY,
FaultyNodeRole.TX_COORDINATOR, false});
+ params.add(new Object[] {FaultyNodeType.BACKUP,
FaultyNodeRole.TX_COORDINATOR, false});
+ params.add(new Object[] {FaultyNodeType.PRIMARY,
FaultyNodeRole.TX_COORDINATOR, true});
+ params.add(new Object[] {FaultyNodeType.BACKUP,
FaultyNodeRole.TX_COORDINATOR, true});
+
+ return params;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ initStoreStrategy();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ storeStgy.resetStore();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return GRID_COUNT;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int backups() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected FailureHandler getFailureHandler(String
igniteInstanceName) {
+ return withFaulireHnd ? new StopNodeFailureHandler() :
super.getFailureHandler(igniteInstanceName);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected CacheConfiguration<?, ?> cacheConfiguration(String
igniteInstanceName) throws Exception {
+ CacheConfiguration<Integer, Integer> ccfg =
(CacheConfiguration<Integer,
Integer>)super.cacheConfiguration(igniteInstanceName);
+
+ ccfg.setInterceptor(
+ new FaultyNodeInterceptor(
+ igniteInstanceName,
+ getTestIgniteInstanceIndex(igniteInstanceName) ==
FAULTY_NODE_IDX,
+ faultyNodeRole
+ )
+ );
+
+ return ccfg;
+ }
+
+ /**
+ *
+ */
+ @Test
+ public void testSystemExceptionAfterCacheStoreCommit() throws Exception {
+ IgniteEx ig = startGrids(gridCount());
+ IgniteCache<Integer, Integer> cache = ig.cache(DEFAULT_CACHE_NAME);
+
+ fillCache(cache, KEYS_NUMBER);
+
+ int keysType = faultyNodeType == FaultyNodeType.PRIMARY ? 0 : 1;
+
+ List<Integer> keysOnFaultyNode =
findKeys(grid(FAULTY_NODE_IDX).localNode(), cache, 5, 0, keysType);
+
+ IgniteEx txCoordinator =
+ faultyNodeRole == FaultyNodeRole.TX_COORDINATOR
+ ? grid(FAULTY_NODE_IDX)
+ : startClientGrid(GRID_COUNT + 1);
+
+ if (faultyNodeType == FaultyNodeType.PRIMARY)
+ updateKeysInTxWithExceptionCatching(txCoordinator,
keysOnFaultyNode);
+ else
+ updateKeysInTx(txCoordinator, keysOnFaultyNode);
+
+ if (withFaulireHnd) {
+ // FH doesn't fail TX coordinator node now, this behavior is wrong
and should be fixed here:
+ // TODO https://issues.apache.org/jira/browse/IGNITE-26060
+ if (faultyNodeRole != FaultyNodeRole.TX_COORDINATOR) {
+ waitForTopology(3);
+
+ assertTrue("Client node should survive test scenario",
+ G.allGrids()
+ .stream()
+ .filter(ignite ->
((IgniteEx)ignite).context().clientNode())
+ .count() == 1);
+ }
+ }
+ else
+ checkKeysOnFaultyNode(keysOnFaultyNode);
+
+ checkKeysOnHealthyNodes(keysOnFaultyNode);
+ }
+
+ /** */
+ private void fillCache(IgniteCache<Integer, Integer> cache, int numOfKeys)
{
+ for (int i = 0; i < numOfKeys; i++)
+ cache.put(i, i);
+ }
+
+ /** */
+ private void checkKeysOnFaultyNode(List<Integer> keysToCheck) {
+ IgniteCache<Object, Object> cache =
grid(FAULTY_NODE_IDX).cache(DEFAULT_CACHE_NAME);
+
+ for (Integer key : keysToCheck)
+ assertEquals(storeStgy.getFromStore(key), cache.get(key));
+ }
+
+ /** */
+ private void checkKeysOnHealthyNodes(List<Integer> keysToCheck) throws
IgniteInterruptedCheckedException {
+ for (int i = 0; i < gridCount(); i++) {
+ if (i != FAULTY_NODE_IDX) {
+ IgniteEx ig = grid(i);
+
+ IgniteCache<Object, Object> cache =
ig.cache(DEFAULT_CACHE_NAME);
+
+ for (Integer key : keysToCheck) {
+ boolean checkResult = waitForCondition(
+ () -> {
+ if (storeStgy.getFromStore(key) == null ||
cache.get(key) == null)
+ return false;
+
+ return
storeStgy.getFromStore(key).equals(cache.get(key));
+ },
+ 1_000);
+
+ assertTrue(
+ String.format("Key inconsistent with CacheStore found
on node %d; nodeName: %s. " +
+ "Key in store: %s, key in cache: %s.",
+ i,
+ ig.name(),
+ storeStgy.getFromStore(key),
+ cache.get(key)),
+ checkResult);
+ }
+ }
+ }
+ }
+
+ /** */
+ private void updateKeysInTx(Ignite ig, List<Integer> keys) {
+ IgniteCache<Object, Object> cache = ig.cache(DEFAULT_CACHE_NAME);
+
+ try (Transaction tx = ig.transactions().txStart()) {
+ for (Integer key : keys)
+ cache.put(key, KEY_UPDATE_FUNCTION.apply(key));
+
+ tx.commit();
+ }
+ }
+
+ /** */
+ private void updateKeysInTxWithExceptionCatching(Ignite ig, List<Integer>
keys) {
+ try {
+ updateKeysInTx(ig, keys);
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+ }
+
+ /** */
+ private static class FaultyNodeInterceptor extends
CacheInterceptorAdapter<Integer, Integer> {
+ /** */
+ private final FaultyNodeRole faultyNodeRole;
+
+ /** */
+ private final String instanceName;
+
+ /** */
+ private final boolean faultyNode;
+
+ /** */
+ private final Set<Integer> seenKeys = ConcurrentHashMap.newKeySet();
+
+ /**
+ * @param instanceName Ignite node instance name.
+ * @param faultyNodeRole Flag if node is tx coordinator.
+ */
+ private FaultyNodeInterceptor(String instanceName, boolean faultyNode,
FaultyNodeRole faultyNodeRole) {
+ this.instanceName = instanceName;
+ this.faultyNode = faultyNode;
+ this.faultyNodeRole = faultyNodeRole;
+ }
+
+ /** {@inheritDoc} */
+ @Override public @Nullable Integer onBeforePut(Cache.Entry<Integer,
Integer> entry, Integer newVal) {
+ // It is an initial cache loading, actual test logic will be
executed later.
+ if (newVal < 2 * KEYS_NUMBER)
+ return newVal;
+
+ if (faultyNode) {
+ if (faultyNodeRole == FaultyNodeRole.TX_COORDINATOR) {
+ // On TX coordinator node CacheInterceptor#onBeforePut is
called twice for the same key
+ // at different stages of TX handling path.
+ if (!seenKeys.add(newVal))
+ throw new IgniteException("IgniteException from
onBeforePut on tx coordinator: " + instanceName);
+ }
+ else
+ throw new IgniteException("IgniteException from
onBeforePut on primary or backup: " + instanceName);
+ }
+
+ return newVal;
+ }
+ }
+}
diff --git
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 7a225e4890c..bce6d557f1b 100644
---
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -24,6 +24,7 @@ import
org.apache.ignite.cache.store.CacheStoreListenerRWThroughDisabledAtomicCa
import
org.apache.ignite.cache.store.CacheStoreListenerRWThroughDisabledTransactionalCacheTest;
import
org.apache.ignite.cache.store.CacheStoreSessionListenerLifecycleSelfTest;
import
org.apache.ignite.cache.store.CacheStoreSessionListenerWriteBehindEnabledTest;
+import org.apache.ignite.cache.store.CacheStoreWithIgniteTxFailureTest;
import
org.apache.ignite.cache.store.jdbc.CacheJdbcStoreSessionListenerSelfTest;
import
org.apache.ignite.internal.processors.GridCacheTxLoadFromStoreOnLockSelfTest;
import org.apache.ignite.internal.processors.cache.CacheClientStoreSelfTest;
@@ -280,6 +281,7 @@ public class IgniteCacheTestSuite4 {
GridTestUtils.addTestIfNeeded(suite,
CacheStoreListenerRWThroughDisabledAtomicCacheTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
CacheStoreListenerRWThroughDisabledTransactionalCacheTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
CacheStoreSessionListenerWriteBehindEnabledTest.class, ignoredTests);
+ GridTestUtils.addTestIfNeeded(suite,
CacheStoreWithIgniteTxFailureTest.class, ignoredTests);
GridTestUtils.addTestIfNeeded(suite, CacheClientStoreSelfTest.class,
ignoredTests);
GridTestUtils.addTestIfNeeded(suite,
CacheStoreUsageMultinodeStaticStartAtomicTest.class, ignoredTests);