This is an automated email from the ASF dual-hosted git repository.

shishkovilja pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new d40158d757b IGNITE-28750 Fix AssertionError in a local CQ during TX 
rollback caused by node failure (#13216)
d40158d757b is described below

commit d40158d757b62ea41dc4d45c1fe8327b1d1f6019
Author: oleg-vlsk <[email protected]>
AuthorDate: Tue Jun 9 16:57:28 2026 +1000

    IGNITE-28750 Fix AssertionError in a local CQ during TX rollback caused by 
node failure (#13216)
---
 .../continuous/CacheContinuousQueryHandler.java    |   4 +
 .../continuous/CacheContinuousQueryListener.java   |   3 +
 .../continuous/CacheContinuousQueryManager.java    |  14 +-
 .../LocalContinuousQueryWithNodeFailureTest.java   | 246 +++++++++++++++++++++
 .../testsuites/IgniteCacheQuerySelfTestSuite6.java |   2 +
 5 files changed, 266 insertions(+), 3 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 200a40cb796..28f941a4cba 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -767,6 +767,10 @@ public class CacheContinuousQueryHandler<K, V> implements 
GridContinuousHandler
             @Override public boolean isPrimaryOnly() {
                 return locOnly && !skipPrimaryCheck;
             }
+
+            @Override public boolean isLocalOnly() {
+                return locOnly;
+            }
         };
 
         CacheContinuousQueryManager mgr = manager(ctx);
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
index cb15288a52b..faa1e53d035 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryListener.java
@@ -131,4 +131,7 @@ public interface CacheContinuousQueryListener<K, V> {
      * @return {@code True} if this listener should be called on events on 
primary partitions only.
      */
     public boolean isPrimaryOnly();
+
+    /** @return {@code true} if this listener belongs to a local continuous 
query. */
+    public boolean isLocalOnly();
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
index ed362129e52..1680b9619a2 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryManager.java
@@ -248,15 +248,23 @@ public class CacheContinuousQueryManager<K, V> extends 
GridCacheManagerAdapter<K
      * @param part Partition number.
      * @param cntr Update counter.
      * @param topVer Topology version.
+     * @param primary Primary partition flag.
      * @return Context.
      */
-    @Nullable public CounterSkipContext skipUpdateCounter(@Nullable 
CounterSkipContext skipCtx,
+    @Nullable public CounterSkipContext skipUpdateCounter(
+        @Nullable CounterSkipContext skipCtx,
         int part,
         long cntr,
         AffinityTopologyVersion topVer,
-        boolean primary) {
-        for (CacheContinuousQueryListener lsnr : lsnrs.values())
+        boolean primary
+    ) {
+        for (CacheContinuousQueryListener<?, ?> lsnr : lsnrs.values()) {
+            // Local CQs notify listeners directly and do not use 
skipped-counter mechanism
+            if (lsnr.isLocalOnly())
+                continue;
+
             skipCtx = lsnr.skipUpdateCounter(cctx, skipCtx, part, cntr, 
topVer, primary);
+        }
 
         return skipCtx;
     }
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/LocalContinuousQueryWithNodeFailureTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/LocalContinuousQueryWithNodeFailureTest.java
new file mode 100644
index 00000000000..9f4ce53072b
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/continuous/LocalContinuousQueryWithNodeFailureTest.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query.continuous;
+
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import javax.cache.Cache;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CacheWriteSynchronizationMode;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cache.query.QueryCursor;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.failure.StopNodeFailureHandler;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionConcurrency;
+import org.apache.ignite.transactions.TransactionIsolation;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.apache.ignite.cache.CacheMode.PARTITIONED;
+import static org.apache.ignite.cache.CacheMode.REPLICATED;
+import static org.apache.ignite.testframework.GridTestUtils.waitForCondition;
+
+/**
+ * Tests interaction between local continuous queries created with {@link 
ContinuousQuery#setLocal(boolean)} set to
+ * {@code true} and transaction rollback counter cleanup.
+ */
+@RunWith(Parameterized.class)
+public class LocalContinuousQueryWithNodeFailureTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final int NODE_CNT = 3;
+
+    /** */
+    private static final int TX_THREADS = 10;
+
+    /** */
+    private static final int KEYS_PER_TX = 10;
+
+    /** */
+    @Parameterized.Parameter
+    public CacheMode cacheMode;
+
+    /** */
+    @Parameterized.Parameters(name = "cacheMode={0}")
+    public static Object[] params() {
+        return new Object[] {REPLICATED, PARTITIONED};
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        CacheConfiguration<?, ?> cacheCfg = new 
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+            .setCacheMode(cacheMode)
+            
.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC)
+            .setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
+
+        if (cacheMode == PARTITIONED)
+            cacheCfg.setBackups(NODE_CNT - 1);
+
+        return super.getConfiguration(igniteInstanceName)
+            .setDataStorageConfiguration(new DataStorageConfiguration()
+                .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration()
+                    .setPersistenceEnabled(true)))
+            .setFailureHandler(new StopNodeFailureHandler())
+            .setCacheConfiguration(cacheCfg);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        stopAllGrids();
+
+        cleanPersistenceDir();
+
+        super.afterTest();
+    }
+
+    /**
+     * Checks that local and distributed continuous queries behave correctly 
when transaction rollback closes
+     * partition update counter gaps after a node failure.
+     */
+    @Test
+    public void testTransactionalCache() throws Exception {
+        startGrids(NODE_CNT).cluster().state(ClusterState.ACTIVE);
+
+        awaitPartitionMapExchange();
+
+        AtomicBoolean stopTxLoad = new AtomicBoolean();
+        AtomicBoolean nodeFailed = new AtomicBoolean();
+
+        AtomicInteger updatesDistrBeforeFail = new AtomicInteger();
+        AtomicInteger updatesLocBeforeFail = new AtomicInteger();
+
+        AtomicInteger updatesDistrAfterFail = new AtomicInteger();
+        AtomicInteger updatesLocAfterFail = new AtomicInteger();
+
+        IgniteCache<Object, Object> cache1 = grid(1).cache(DEFAULT_CACHE_NAME);
+        IgniteCache<Object, Object> cache0 = grid(0).cache(DEFAULT_CACHE_NAME);
+
+        IgniteInternalFuture<?> txLoadFut = launchTxLoad(grid(1), cache1, 
stopTxLoad);
+
+        try (
+            QueryCursor<Cache.Entry<Object, Object>> locCur = cache0.query(
+                buildContinuousQuery(true, nodeFailed, updatesLocBeforeFail, 
updatesLocAfterFail));
+
+            QueryCursor<Cache.Entry<Object, Object>> distrCur = cache0.query(
+                buildContinuousQuery(false, nodeFailed, 
updatesDistrBeforeFail, updatesDistrAfterFail))
+        ) {
+            assertTrue(
+                String.format("Failed to receive expected updates before node 
failure [locUpdates=%s, distrUpdates=%s]",
+                    updatesLocBeforeFail.get(), updatesDistrBeforeFail.get()),
+                waitForCondition(() -> updatesLocBeforeFail.get() > 0 && 
updatesDistrBeforeFail.get() > 0,
+                    getTestTimeout() / 2)
+            );
+
+            failNode(NODE_CNT - 1);
+
+            waitForTopology(NODE_CNT - 1);
+
+            nodeFailed.set(true);
+
+            assertTrue(
+                String.format("Failed to receive expected updates after node 
failure [locUpdates=%s, distrUpdates=%s]",
+                    updatesLocAfterFail.get(), updatesDistrAfterFail.get()),
+                waitForCondition(() -> updatesLocAfterFail.get() > 0 && 
updatesDistrAfterFail.get() > 0,
+                    getTestTimeout() / 2)
+            );
+
+            for (int i : IntStream.range(0, NODE_CNT - 1).toArray()) {
+                IgniteEx grid = grid(i);
+
+                assertFalse("Grid " + i + " is stopping", 
grid.context().isStopping());
+                assertNull("Failure context is not null for grid " + i, 
grid.context().failure().failureContext());
+            }
+        }
+        finally {
+            stopTxLoad.set(true);
+
+            txLoadFut.get();
+        }
+    }
+
+    /** */
+    private IgniteInternalFuture<?> launchTxLoad(
+        IgniteEx grid,
+        IgniteCache<Object, Object> cache,
+        AtomicBoolean stopTxLoad
+    ) {
+        return GridTestUtils.runMultiThreadedAsync(() -> {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            while (!stopTxLoad.get()) {
+                try (
+                    Transaction tx = grid.transactions().txStart(
+                        TransactionConcurrency.PESSIMISTIC,
+                        TransactionIsolation.REPEATABLE_READ)
+                ) {
+                    Map<Integer, Object> vals = rnd.ints()
+                        .limit(KEYS_PER_TX)
+                        .boxed()
+                        .collect(Collectors.toMap(
+                            Function.identity(),
+                            Function.identity(),
+                            (a, b) -> a,
+                            TreeMap::new
+                        ));
+
+                    cache.putAll(vals);
+
+                    tx.commit();
+                }
+                catch (Exception ignore) {
+                    // No-op.
+                }
+            }
+        }, TX_THREADS, "test-tx");
+    }
+
+    /** */
+    private ContinuousQuery<Object, Object> buildContinuousQuery(
+        boolean locOnly,
+        AtomicBoolean nodeFailed,
+        AtomicInteger cntrBeforeFail,
+        AtomicInteger cntrAfterFail
+    ) {
+        return new ContinuousQuery<>()
+            .setLocal(locOnly)
+            .setLocalListener(new CacheEntryUpdatedListener<>() {
+                @Override public void onUpdated(Iterable iterable) throws 
CacheEntryListenerException {
+                    for (Object ignored : iterable) {
+                        if (!nodeFailed.get())
+                            cntrBeforeFail.incrementAndGet();
+                        else
+                            cntrAfterFail.incrementAndGet();
+                    }
+
+                    doSleep(1);
+                }
+            });
+    }
+
+    /** */
+    private void failNode(int lastNodeIdx) {
+        
((TcpDiscoverySpi)grid(lastNodeIdx).configuration().getDiscoverySpi()).simulateNodeFailure();
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
index 53f3e13f98f..714d316f122 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteCacheQuerySelfTestSuite6.java
@@ -40,6 +40,7 @@ import 
org.apache.ignite.internal.processors.cache.query.continuous.GridCacheCon
 import 
org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicOneNodeTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.GridCacheContinuousQueryReplicatedAtomicSelfTest;
 import 
org.apache.ignite.internal.processors.cache.query.continuous.IgniteCacheContinuousQueryClientTest;
+import 
org.apache.ignite.internal.processors.cache.query.continuous.LocalContinuousQueryWithNodeFailureTest;
 import 
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsQueryTest;
 import 
org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsSystemViewTablesTest;
 import 
org.apache.ignite.internal.processors.query.DropTableAfterCteSqlQueryTest;
@@ -86,6 +87,7 @@ import org.junit.runners.Suite;
     QueryEntityAliasesTest.class,
     CacheContinuousQueryEntriesExpireTest.class,
     DropTableAfterCteSqlQueryTest.class,
+    LocalContinuousQueryWithNodeFailureTest.class,
 })
 public class IgniteCacheQuerySelfTestSuite6 {
 }

Reply via email to