This is an automated email from the ASF dual-hosted git repository.
shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new d40158d757b IGNITE-28750 Fix AssertionError in a local CQ during TX
rollback caused by node failure (#13216)
d40158d757b is described below
commit d40158d757b62ea41dc4d45c1fe8327b1d1f6019
Author: oleg-vlsk <[email protected]>
AuthorDate: Tue Jun 9 16:57:28 2026 +1000
IGNITE-28750 Fix AssertionError in a local CQ during TX rollback caused by
node failure (#13216)
---
.../continuous/CacheContinuousQueryHandler.java | 4 +
.../continuous/CacheContinuousQueryListener.java | 3 +
.../continuous/CacheContinuousQueryManager.java | 14 +-
.../LocalContinuousQueryWithNodeFailureTest.java | 246 +++++++++++++++++++++
.../testsuites/IgniteCacheQuerySelfTestSuite6.java | 2 +
5 files changed, 266 insertions(+), 3 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 200a40cb796..28f941a4cba 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -767,6 +767,10 @@ public class CacheContinuousQueryHandler<K, V> implements
GridContinuousHandler
@Override public boolean isPrimaryOnly() {
return locOnly && !skipPrimaryCheck;
}
+
+ @Override public boolean isLocalOnly() {
+ return locOnly;
+ }
};
CacheContinuousQueryManager mgr = manager(ctx);
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index cb15288a52b..faa1e53d035 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -131,4 +131,7 @@ public interface CacheContinuousQueryListener<K, V> {
* @return {@code True} if this listener should be called on events on
primary partitions only.
*/
public boolean isPrimaryOnly();
+
+ /** @return {@code true} if this listener belongs to a local continuous
query. */
+ public boolean isLocalOnly();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index ed362129e52..1680b9619a2 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -248,15 +248,23 @@ public class CacheContinuousQueryManager<K, V> extends
GridCacheManagerAdapter<K
* @param part Partition number.
* @param cntr Update counter.
* @param topVer Topology version.
+ * @param primary Primary partition flag.
* @return Context.
*/
- @Nullable public CounterSkipContext skipUpdateCounter(@Nullable
CounterSkipContext skipCtx,
+ @Nullable public CounterSkipContext skipUpdateCounter(
+ @Nullable CounterSkipContext skipCtx,
int part,
long cntr,
AffinityTopologyVersion topVer,
- boolean primary) {
- for (CacheContinuousQueryListener lsnr : lsnrs.values())
+ boolean primary
+ ) {
+ for (CacheContinuousQueryListener<?, ?> lsnr : lsnrs.values()) {
+ // Local CQs notify listeners directly and do not use
skipped-counter mechanism
+ if (lsnr.isLocalOnly())
+ continue;
+
skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr,
topVer, primary);
+ }
return skipCtx;
}
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/LocalContinuousQueryWithNodeFailureTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/LocalContinuousQueryWithNodeFailureTest.java
new file mode 100644
index 00000000000..9f4ce53072b
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/LocalContinuousQueryWithNodeFailureTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests interaction between local continuous queries created with {@link
ContinuousQuery#setLocal(boolean)} set to
+ * {@code true} and transaction rollback counter cleanup.
+ */
+@RunWith(Parameterized.class)
+public class LocalContinuousQueryWithNodeFailureTest extends
GridCommonAbstractTest {
+ /** */
+ private static final int NODE_CNT = 3;
+
+ /** */
+ private static final int TX_THREADS = 10;
+
+ /** */
+ private static final int KEYS_PER_TX = 10;
+
+ /** */
+ @Parameterized.Parameter
+ public CacheMode cacheMode;
+
+ /** */
+ @Parameterized.Parameters(name = "cacheMode={0}")
+ public static Object[] params() {
+ return new Object[] {REPLICATED, PARTITIONED};
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ CacheConfiguration<?, ?> cacheCfg = new
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setCacheMode(cacheMode)
+
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+ .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+ if (cacheMode == PARTITIONED)
+ cacheCfg.setBackups(NODE_CNT - 1);
+
+ return super.getConfiguration(igniteInstanceName)
+ .setDataStorageConfiguration(new DataStorageConfiguration()
+ .setDefaultDataRegionConfiguration(new
DataRegionConfiguration()
+ .setPersistenceEnabled(true)))
+ .setFailureHandler(new StopNodeFailureHandler())
+ .setCacheConfiguration(cacheCfg);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ super.beforeTest();
+
+ cleanPersistenceDir();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ stopAllGrids();
+
+ cleanPersistenceDir();
+
+ super.afterTest();
+ }
+
+ /**
+ * Checks that local and distributed continuous queries behave correctly
when transaction rollback closes
+ * partition update counter gaps after a node failure.
+ */
+ @Test
+ public void testTransactionalCache() throws Exception {
+ startGrids(NODE_CNT).cluster().state(ClusterState.ACTIVE);
+
+ awaitPartitionMapExchange();
+
+ AtomicBoolean stopTxLoad = new AtomicBoolean();
+ AtomicBoolean nodeFailed = new AtomicBoolean();
+
+ AtomicInteger updatesDistrBeforeFail = new AtomicInteger();
+ AtomicInteger updatesLocBeforeFail = new AtomicInteger();
+
+ AtomicInteger updatesDistrAfterFail = new AtomicInteger();
+ AtomicInteger updatesLocAfterFail = new AtomicInteger();
+
+ IgniteCache<Object, Object> cache1 = grid(1).cache(DEFAULT_CACHE_NAME);
+ IgniteCache<Object, Object> cache0 = grid(0).cache(DEFAULT_CACHE_NAME);
+
+ IgniteInternalFuture<?> txLoadFut = launchTxLoad(grid(1), cache1,
stopTxLoad);
+
+ try (
+ QueryCursor<Cache.Entry<Object, Object>> locCur = cache0.query(
+ buildContinuousQuery(true, nodeFailed, updatesLocBeforeFail,
updatesLocAfterFail));
+
+ QueryCursor<Cache.Entry<Object, Object>> distrCur = cache0.query(
+ buildContinuousQuery(false, nodeFailed,
updatesDistrBeforeFail, updatesDistrAfterFail))
+ ) {
+ assertTrue(
+ String.format("Failed to receive expected updates before node
failure [locUpdates=%s, distrUpdates=%s]",
+ updatesLocBeforeFail.get(), updatesDistrBeforeFail.get()),
+ waitForCondition(() -> updatesLocBeforeFail.get() > 0 &&
updatesDistrBeforeFail.get() > 0,
+ getTestTimeout() / 2)
+ );
+
+ failNode(NODE_CNT - 1);
+
+ waitForTopology(NODE_CNT - 1);
+
+ nodeFailed.set(true);
+
+ assertTrue(
+ String.format("Failed to receive expected updates after node
failure [locUpdates=%s, distrUpdates=%s]",
+ updatesLocAfterFail.get(), updatesDistrAfterFail.get()),
+ waitForCondition(() -> updatesLocAfterFail.get() > 0 &&
updatesDistrAfterFail.get() > 0,
+ getTestTimeout() / 2)
+ );
+
+ for (int i : IntStream.range(0, NODE_CNT - 1).toArray()) {
+ IgniteEx grid = grid(i);
+
+ assertFalse("Grid " + i + " is stopping",
grid.context().isStopping());
+ assertNull("Failure context is not null for grid " + i,
grid.context().failure().failureContext());
+ }
+ }
+ finally {
+ stopTxLoad.set(true);
+
+ txLoadFut.get();
+ }
+ }
+
+ /** */
+ private IgniteInternalFuture<?> launchTxLoad(
+ IgniteEx grid,
+ IgniteCache<Object, Object> cache,
+ AtomicBoolean stopTxLoad
+ ) {
+ return GridTestUtils.runMultiThreadedAsync(() -> {
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ while (!stopTxLoad.get()) {
+ try (
+ Transaction tx = grid.transactions().txStart(
+ TransactionConcurrency.PESSIMISTIC,
+ TransactionIsolation.REPEATABLE_READ)
+ ) {
+ Map<Integer, Object> vals = rnd.ints()
+ .limit(KEYS_PER_TX)
+ .boxed()
+ .collect(Collectors.toMap(
+ Function.identity(),
+ Function.identity(),
+ (a, b) -> a,
+ TreeMap::new
+ ));
+
+ cache.putAll(vals);
+
+ tx.commit();
+ }
+ catch (Exception ignore) {
+ // No-op.
+ }
+ }
+ }, TX_THREADS, "test-tx");
+ }
+
+ /** */
+ private ContinuousQuery<Object, Object> buildContinuousQuery(
+ boolean locOnly,
+ AtomicBoolean nodeFailed,
+ AtomicInteger cntrBeforeFail,
+ AtomicInteger cntrAfterFail
+ ) {
+ return new ContinuousQuery<>()
+ .setLocal(locOnly)
+ .setLocalListener(new CacheEntryUpdatedListener<>() {
+ @Override public void onUpdated(Iterable iterable) throws
CacheEntryListenerException {
+ for (Object ignored : iterable) {
+ if (!nodeFailed.get())
+ cntrBeforeFail.incrementAndGet();
+ else
+ cntrAfterFail.incrementAndGet();
+ }
+
+ doSleep(1);
+ }
+ });
+ }
+
+ /** */
+ private void failNode(int lastNodeIdx) {
+
((TcpDiscoverySpi)grid(lastNodeIdx).configuration().getDiscoverySpi()).simulateNodeFailure();
+ }
+}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
index 53f3e13f98f..714d316f122 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
@@ -40,6 +40,7 @@ import
org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
import
org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicOneNodeTest;
import
org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest;
import
org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
+import
org.apache.ignite.internal.processors.cache.query.continuous.LocalContinuousQueryWithNodeFailureTest;
import
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsQueryTest;
import
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsSystemViewTablesTest;
import
org.apache.ignite.internal.processors.query.DropTableAfterCteSqlQueryTest;
@@ -86,6 +87,7 @@ import org.junit.runners.Suite;
QueryEntityAliasesTest.class,
CacheContinuousQueryEntriesExpireTest.class,
DropTableAfterCteSqlQueryTest.class,
+ LocalContinuousQueryWithNodeFailureTest.class,
})
public class IgniteCacheQuerySelfTestSuite6 {
}