This is an automated email from the ASF dual-hosted git repository.
alexpl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 510fa7bd02e IGNITE-23971 Fix striped queue overflow in ScanQuery for
caches with touched or accessed TTL expire policy - Fixes #11736.
510fa7bd02e is described below
commit 510fa7bd02e3ef9e782d6ce5ae610b8cab45a586
Author: Sergey Korotkov <[email protected]>
AuthorDate: Wed Dec 25 09:09:17 2024 +0300
IGNITE-23971 Fix striped queue overflow in ScanQuery for caches with
touched or accessed TTL expire policy - Fixes #11736.
Signed-off-by: Aleksey Plekhanov <[email protected]>
---
.../cache/distributed/dht/GridDhtCacheAdapter.java | 11 +-
.../processors/cache/query/ScanQueryIterator.java | 8 +-
.../cache/query/ScanQueryUpdateTtlTest.java | 115 +++++++++++++++++++++
.../IgniteBinaryCacheQueryTestSuite4.java | 3 +
4 files changed, 130 insertions(+), 7 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 5826820c068..0a545287ee9 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1365,13 +1365,14 @@ public abstract class GridDhtCacheAdapter<K, V> extends
GridDistributedCacheAdap
@Nullable UUID srcNodeId
) {
if (!F.isEmpty(expiryPlc.entries())) {
+ Map<KeyCacheObject, GridCacheVersion> entries =
Map.copyOf(expiryPlc.entries());
+
+ Map<UUID, Collection<IgniteBiTuple<KeyCacheObject,
GridCacheVersion>>> rdrs =
+ expiryPlc.readers() != null ? Map.copyOf(expiryPlc.readers())
: null;
+
ctx.closures().runLocalSafe(new GridPlainRunnable() {
@SuppressWarnings({"ForLoopReplaceableByForEach"})
@Override public void run() {
- Map<KeyCacheObject, GridCacheVersion> entries =
expiryPlc.entries();
-
- assert entries != null && !entries.isEmpty();
-
Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new
HashMap<>();
for (Map.Entry<KeyCacheObject, GridCacheVersion> e :
entries.entrySet()) {
@@ -1416,8 +1417,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends
GridDistributedCacheAdap
}
}
- Map<UUID, Collection<IgniteBiTuple<KeyCacheObject,
GridCacheVersion>>> rdrs = expiryPlc.readers();
-
if (rdrs != null) {
assert !rdrs.isEmpty();
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
index 76187359bf1..7f8f9e48f91 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
@@ -41,6 +41,9 @@ import org.jetbrains.annotations.Nullable;
/** */
public final class ScanQueryIterator<K, V, R> extends
AbstractScanQueryIterator<K, V, R> {
+ /** */
+ public static final int EXPIRE_ENTRIES_FLUSH_CNT = 100;
+
/** */
private static final long serialVersionUID = 0L;
@@ -182,8 +185,11 @@ public final class ScanQueryIterator<K, V, R> extends
AbstractScanQueryIterator<
val = null;
}
- if (dht != null && expiryPlc.readyToFlush(100))
+ if (dht != null &&
expiryPlc.readyToFlush(EXPIRE_ENTRIES_FLUSH_CNT)) {
dht.sendTtlUpdateRequest(expiryPlc);
+
+ expiryPlc.reset();
+ }
}
else
val = row.value();
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryUpdateTtlTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryUpdateTtlTest.java
new file mode 100644
index 00000000000..be3d2381f6c
--- /dev/null
+++
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/query/ScanQueryUpdateTtlTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.query;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.TouchedExpiryPolicy;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import
org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.IgniteSpiException;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.junit.Test;
+
+import static
org.apache.ignite.internal.processors.cache.query.ScanQueryIterator.EXPIRE_ENTRIES_FLUSH_CNT;
+
+/** */
+public class ScanQueryUpdateTtlTest extends GridCommonAbstractTest {
+ /** */
+ private static final int KEYS = EXPIRE_ENTRIES_FLUSH_CNT * 3;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String
igniteInstanceName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+ if (!cfg.isClientMode()) {
+ cfg.setCacheConfiguration(new
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+ .setBackups(1)
+ .setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(new
Duration(TimeUnit.MINUTES, 60))));
+ }
+
+ cfg.setCommunicationSpi(new CheckingCommunicationSpi());
+
+ return cfg;
+ }
+
+ /** */
+ @Test
+ public void testScanQueryIteratorExpireEntriesFlush() throws Exception {
+ try (IgniteEx ignored = startGrids(2)) {
+ try (IgniteEx cln = startClientGrid(2)) {
+ for (int i = 0; i < KEYS; i++)
+ cln.cache(DEFAULT_CACHE_NAME).put(i, i);
+
+ cln.cache(DEFAULT_CACHE_NAME).query(new
ScanQuery<>()).forEach((v) -> {});
+ }
+
+
grid(0).context().pools().getStripedExecutorService().awaitComplete();
+
grid(1).context().pools().getStripedExecutorService().awaitComplete();
+
+ assertTrue("Each key must be sent only once",
CheckingCommunicationSpi.keyCnt.values().stream().allMatch(c -> c == 1));
+
+ // Check here for (EXPIRE_ENTRIES_FLUSH_CNT + 1) since
CacheExpiryPolicy::readyToFlush method
+ // checks as (entries.size() > EXPIRE_ENTRIES_FLUSH_CNT) before
flush.
+ assertTrue("Single GridCacheTtlUpdateRequest must be sent with no
more then maximum allowed keys " +
+ "[maxAllowed=" + (EXPIRE_ENTRIES_FLUSH_CNT + 1) +
+ ", maxActual=" + CheckingCommunicationSpi.maxKeyBatchCnt +
"]",
+ CheckingCommunicationSpi.maxKeyBatchCnt <=
EXPIRE_ENTRIES_FLUSH_CNT + 1);
+
+ assertEquals("All keys must be sent", KEYS,
CheckingCommunicationSpi.keyCnt.keySet().size());
+ }
+ }
+
+ /** */
+ private static class CheckingCommunicationSpi extends
TestRecordingCommunicationSpi {
+ /** */
+ private static int maxKeyBatchCnt = 0;
+
+ /** */
+ private static final Map<KeyCacheObject, Integer> keyCnt = new
ConcurrentHashMap<>(U.capacity(KEYS));
+
+ /** {@inheritDoc} */
+ @Override public void sendMessage(ClusterNode node, Message msg,
IgniteInClosure<IgniteException> ackC)
+ throws IgniteSpiException {
+ if ((msg instanceof GridIoMessage) &&
(((GridIoMessage)msg).message() instanceof GridCacheTtlUpdateRequest)) {
+ GridCacheTtlUpdateRequest ttlUpdReq =
(GridCacheTtlUpdateRequest)((GridIoMessage)msg).message();
+
+ if (ttlUpdReq.keys().size() > maxKeyBatchCnt)
+ maxKeyBatchCnt = ttlUpdReq.keys().size();
+
+ for (KeyCacheObject key : ttlUpdReq.keys())
+ keyCnt.put(key, keyCnt.getOrDefault(key, 0) + 1);
+ }
+
+ super.sendMessage(node, msg, ackC);
+ }
+ }
+}
diff --git
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
index 4d598517c69..7f5b7e29b62 100644
---
a/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
+++
b/modules/indexing/src/test/java/org/apache/ignite/testsuites/IgniteBinaryCacheQueryTestSuite4.java
@@ -25,6 +25,7 @@ import
org.apache.ignite.internal.processors.cache.distributed.near.IgniteCacheQ
import
org.apache.ignite.internal.processors.cache.distributed.replicated.GridCacheReplicatedTxMultiNodeBasicTest;
import
org.apache.ignite.internal.processors.cache.query.ScanQueryTransactionIsolationTest;
import
org.apache.ignite.internal.processors.cache.query.ScanQueryTransactionsUnsupportedModesTest;
+import
org.apache.ignite.internal.processors.cache.query.ScanQueryUpdateTtlTest;
import org.apache.ignite.internal.processors.query.DmlBatchSizeDeadlockTest;
import
org.apache.ignite.internal.processors.query.IgniteSqlCreateTableTemplateTest;
import org.apache.ignite.internal.processors.query.LocalQueryLazyTest;
@@ -101,6 +102,8 @@ import org.junit.runners.Suite;
ScanQueryTransactionsUnsupportedModesTest.class,
ScanQueryTransactionIsolationTest.class,
+
+ ScanQueryUpdateTtlTest.class,
})
public class IgniteBinaryCacheQueryTestSuite4 {
/** Setup lazy mode default. */