This is an automated email from the ASF dual-hosted git repository.

alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 510fa7bd02e IGNITE-23971 Fix striped queue overflow in ScanQuery for 
caches with touched or accessed TTL expire policy - Fixes #11736.
510fa7bd02e is described below

commit 510fa7bd02e3ef9e782d6ce5ae610b8cab45a586
Author: Sergey Korotkov <[email protected]>
AuthorDate: Wed Dec 25 09:09:17 2024 +0300

    IGNITE-23971 Fix striped queue overflow in ScanQuery for caches with 
touched or accessed TTL expire policy - Fixes #11736.
    
    Signed-off-by: Aleksey Plekhanov <[email protected]>
---
 .../cache/distributed/dht/GridDhtCacheAdapter.java |  11 +-
 .../processors/cache/query/ScanQueryIterator.java  |   8 +-
 .../cache/query/ScanQueryUpdateTtlTest.java        | 115 +++++++++++++++++++++
 .../IgniteBinaryCacheQueryTestSuite4.java          |   3 +
 4 files changed, 130 insertions(+), 7 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5826820c068..0a545287ee9 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1365,13 +1365,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
         @Nullable UUID srcNodeId
     ) {
         if (!F.isEmpty(expiryPlc.entries())) {
+            Map<KeyCacheObject, GridCacheVersion> entries = 
Map.copyOf(expiryPlc.entries());
+
+            Map<UUID, Collection<IgniteBiTuple<KeyCacheObject, 
GridCacheVersion>>> rdrs =
+                expiryPlc.readers() != null ? Map.copyOf(expiryPlc.readers()) 
: null;
+
             ctx.closures().runLocalSafe(new GridPlainRunnable() {
                 @SuppressWarnings({"ForLoopReplaceableByForEach"})
                 @Override public void run() {
-                    Map<KeyCacheObject, GridCacheVersion> entries = 
expiryPlc.entries();
-
-                    assert entries != null && !entries.isEmpty();
-
                     Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new 
HashMap<>();
 
                     for (Map.Entry<KeyCacheObject, GridCacheVersion> e : 
entries.entrySet()) {
@@ -1416,8 +1417,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
                         }
                     }
 
-                    Map<UUID, Collection<IgniteBiTuple<KeyCacheObject, 
GridCacheVersion>>> rdrs = expiryPlc.readers();
-
                     if (rdrs != null) {
                         assert !rdrs.isEmpty();
 
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
index 76187359bf1..7f8f9e48f91 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
@@ -41,6 +41,9 @@ import org.jetbrains.annotations.Nullable;
 
 /** */
 public final class ScanQueryIterator<K, V, R> extends 
AbstractScanQueryIterator<K, V, R> {
+    /** */
+    public static final int EXPIRE_ENTRIES_FLUSH_CNT = 100;
+
     /** */
     private static final long serialVersionUID = 0L;
 
@@ -182,8 +185,11 @@ public final class ScanQueryIterator<K, V, R> extends 
AbstractScanQueryIterator<
                     val = null;
                 }
 
-                if (dht != null && expiryPlc.readyToFlush(100))
+                if (dht != null && 
expiryPlc.readyToFlush(EXPIRE_ENTRIES_FLUSH_CNT)) {
                     dht.sendTtlUpdateRequest(expiryPlc);
+
+                    expiryPlc.reset();
+                }
             }
             else
                 val = row.value();
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryUpdateTtlTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryUpdateTtlTest.java
new file mode 100644
index 00000000000..be3d2381f6c
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryUpdateTtlTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.TouchedExpiryPolicy;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import 
org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static 
org.apache.ignite.internal.processors.cache.query.ScanQueryIterator.EXPIRE_ENTRIES_FLUSH_CNT;
+
+/** */
+public class ScanQueryUpdateTtlTest extends GridCommonAbstractTest {
+    /** */
+    private static final int KEYS = EXPIRE_ENTRIES_FLUSH_CNT * 3;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (!cfg.isClientMode()) {
+            cfg.setCacheConfiguration(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                .setBackups(1)
+                .setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(new 
Duration(TimeUnit.MINUTES, 60))));
+        }
+
+        cfg.setCommunicationSpi(new CheckingCommunicationSpi());
+
+        return cfg;
+    }
+
+    /** */
+    @Test
+    public void testScanQueryIteratorExpireEntriesFlush() throws Exception {
+        try (IgniteEx ignored = startGrids(2)) {
+            try (IgniteEx cln = startClientGrid(2)) {
+                for (int i = 0; i < KEYS; i++)
+                    cln.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+                cln.cache(DEFAULT_CACHE_NAME).query(new 
ScanQuery<>()).forEach((v) -> {});
+            }
+
+            
grid(0).context().pools().getStripedExecutorService().awaitComplete();
+            
grid(1).context().pools().getStripedExecutorService().awaitComplete();
+
+            assertTrue("Each key must be sent only once", 
CheckingCommunicationSpi.keyCnt.values().stream().allMatch(c -> c == 1));
+
+            // Check here for (EXPIRE_ENTRIES_FLUSH_CNT + 1) since 
CacheExpiryPolicy::readyToFlush method
+            // checks as (entries.size() > EXPIRE_ENTRIES_FLUSH_CNT) before 
flush.
+            assertTrue("Single GridCacheTtlUpdateRequest must be sent with no 
more then maximum allowed keys " +
+                    "[maxAllowed=" + (EXPIRE_ENTRIES_FLUSH_CNT + 1) +
+                    ", maxActual=" + CheckingCommunicationSpi.maxKeyBatchCnt + 
"]",
+                CheckingCommunicationSpi.maxKeyBatchCnt <= 
EXPIRE_ENTRIES_FLUSH_CNT + 1);
+
+            assertEquals("All keys must be sent", KEYS, 
CheckingCommunicationSpi.keyCnt.keySet().size());
+        }
+    }
+
+    /** */
+    private static class CheckingCommunicationSpi extends 
TestRecordingCommunicationSpi {
+        /** */
+        private static int maxKeyBatchCnt = 0;
+
+        /** */
+        private static final Map<KeyCacheObject, Integer> keyCnt = new 
ConcurrentHashMap<>(U.capacity(KEYS));
+
+        /** {@inheritDoc} */
+        @Override public void sendMessage(ClusterNode node, Message msg, 
IgniteInClosure<IgniteException> ackC)
+                    throws IgniteSpiException {
+            if ((msg instanceof GridIoMessage) && 
(((GridIoMessage)msg).message() instanceof GridCacheTtlUpdateRequest)) {
+                GridCacheTtlUpdateRequest ttlUpdReq = 
(GridCacheTtlUpdateRequest)((GridIoMessage)msg).message();
+
+                if (ttlUpdReq.keys().size() > maxKeyBatchCnt)
+                    maxKeyBatchCnt = ttlUpdReq.keys().size();
+
+                for (KeyCacheObject key : ttlUpdReq.keys())
+                    keyCnt.put(key, keyCnt.getOrDefault(key, 0) + 1);
+            }
+
+            super.sendMessage(node, msg, ackC);
+        }
+    }
+}
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
index 4d598517c69..7f5b7e29b62 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
@@ -25,6 +25,7 @@ import 
org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQ
 import 
org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxMultiNodeBasicTest;
 import 
org.apache.ignite.internal.processors.cache.query.ScanQueryTransactionIsolationTest;
 import 
org.apache.ignite.internal.processors.cache.query.ScanQueryTransactionsUnsupportedModesTest;
+import 
org.apache.ignite.internal.processors.cache.query.ScanQueryUpdateTtlTest;
 import org.apache.ignite.internal.processors.query.DmlBatchSizeDeadlockTest;
 import 
org.apache.ignite.internal.processors.query.IgniteSqlCreateTableTemplateTest;
 import org.apache.ignite.internal.processors.query.LocalQueryLazyTest;
@@ -101,6 +102,8 @@ import org.junit.runners.Suite;
 
     ScanQueryTransactionsUnsupportedModesTest.class,
     ScanQueryTransactionIsolationTest.class,
+
+    ScanQueryUpdateTtlTest.class,
 })
 public class IgniteBinaryCacheQueryTestSuite4 {
     /** Setup lazy mode default. */

Reply via email to