This is an automated email from the ASF dual-hosted git repository.

timoninmaxim pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b1a4a1fe72 IGNITE-23591 Fix entry and checkpoint locks orders in 
DataStreamer and update TTL (#11641)
7b1a4a1fe72 is described below

commit 7b1a4a1fe72110df4bf6623d87dd764b57302085
Author: Maksim Timonin <[email protected]>
AuthorDate: Thu Nov 7 11:45:44 2024 +0300

    IGNITE-23591 Fix entry and checkpoint locks orders in DataStreamer and 
update TTL (#11641)
---
 .../cache/distributed/dht/GridDhtCacheAdapter.java |   4 +
 .../processors/cache/query/ScanQueryIterator.java  |   5 +
 .../ConcurrentCheckpointAndUpdateTtlTest.java      | 164 +++++++++++++++++++++
 .../ignite/testsuites/IgniteCacheTestSuite10.java  |   2 +
 4 files changed, 175 insertions(+)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index b47447f87c6..5826820c068 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -1499,6 +1499,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
                 GridCacheEntryEx entry = null;
 
                 try {
+                    ctx.shared().database().checkpointReadLock();
+
                     while (true) {
                         try {
                             entry = cache.entryEx(keys.get(i));
@@ -1524,6 +1526,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
                 finally {
                     if (entry != null)
                         entry.touch();
+
+                    ctx.shared().database().checkpointReadUnlock();
                 }
             }
             catch (IgniteCheckedException e) {
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
index 777b19f3eab..24408ac9486 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/ScanQueryIterator.java
@@ -258,6 +258,8 @@ public final class ScanQueryIterator<K, V, R> extends 
GridCloseableIteratorAdapt
                     CacheDataRow tmp = row;
 
                     while (true) {
+                        cctx.shared().database().checkpointReadLock();
+
                         try {
                             GridCacheEntryEx entry = cache.entryEx(key);
 
@@ -272,6 +274,9 @@ public final class ScanQueryIterator<K, V, R> extends 
GridCloseableIteratorAdapt
                         catch (GridCacheEntryRemovedException ignore) {
                             tmp = null;
                         }
+                        finally {
+                            cctx.shared().database().checkpointReadUnlock();
+                        }
                     }
                 }
                 catch (IgniteCheckedException e) {
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ConcurrentCheckpointAndUpdateTtlTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ConcurrentCheckpointAndUpdateTtlTest.java
new file mode 100644
index 00000000000..bbd515cde7f
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/ConcurrentCheckpointAndUpdateTtlTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.TouchedExpiryPolicy;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.query.ScanQuery;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.DataRegionConfiguration;
+import org.apache.ignite.configuration.DataStorageConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.apache.logging.log4j.core.config.Configurator;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static 
org.apache.ignite.testframework.GridTestUtils.runMultiThreadedAsync;
+
+/** */
+@RunWith(Parameterized.class)
+public class ConcurrentCheckpointAndUpdateTtlTest extends 
GridCommonAbstractTest {
+    /** */
+    private static final int KEY = 0;
+
+    /** */
+    private final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+    /** */
+    @Parameterized.Parameter
+    public boolean updateWithDataStreamer;
+
+    /** */
+    @Parameterized.Parameter(1)
+    public boolean touchWithScanQuery;
+
+    /** */
+    @Parameterized.Parameter(2)
+    public CacheAtomicityMode mode;
+
+    /** */
+    @Parameterized.Parameters(name = "dataStreamer={0}, touchQuery={1}, 
cacheMode={2}")
+    public static Collection<Object[]> params() {
+        Collection<Object[]> params = new ArrayList<>();
+
+        for (CacheAtomicityMode cacheMode: CacheAtomicityMode.values()) {
+            for (boolean updateMode: new boolean[] {true, false}) {
+                for (boolean touchMode: new boolean[] {true, false})
+                    params.add(new Object[] { updateMode, touchMode, cacheMode 
});
+            }
+        }
+
+        return params;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
+
+        if (!cfg.isClientMode()) {
+            cfg.setDataStorageConfiguration(new DataStorageConfiguration()
+                    .setDefaultDataRegionConfiguration(new 
DataRegionConfiguration().setPersistenceEnabled(true)))
+                .setCacheConfiguration(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME)
+                    .setAtomicityMode(mode)
+                    .setBackups(1)
+                    .setExpiryPolicyFactory(TouchedExpiryPolicy.factoryOf(new 
Duration(TimeUnit.SECONDS, 10))));
+        }
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        super.beforeTest();
+
+        cleanPersistenceDir();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTest() throws Exception {
+        super.afterTest();
+
+        stopAllGrids();
+
+        cleanPersistenceDir();
+    }
+
+    /** */
+    @Test
+    public void concurrentUpdateTouchCheckpointKey() throws Exception {
+        startGrids(2).cluster().state(ClusterState.ACTIVE);
+
+        IgniteEx cln = startClientGrid(2);
+
+        AtomicBoolean stop = new AtomicBoolean();
+
+        IgniteInternalFuture<?> updateFut = runMultiThreadedAsync(() -> {
+            while (!stop.get()) {
+                if (updateWithDataStreamer) {
+                    // Skip annoying warnings.
+                    Configurator.setLevel(DataStreamerImpl.class.getName(), 
org.apache.logging.log4j.Level.ERROR);
+
+                    try (IgniteDataStreamer<Integer, Integer> stream = 
cln.dataStreamer(DEFAULT_CACHE_NAME)) {
+                        stream.addData(KEY, rnd.nextInt());
+                    }
+                }
+                else
+                    cln.cache(DEFAULT_CACHE_NAME).put(KEY, rnd.nextInt());
+            }
+        }, 1, "update");
+
+        IgniteInternalFuture<?> touchFut = runMultiThreadedAsync(() -> {
+            while (!stop.get()) {
+                if (touchWithScanQuery)
+                    cln.cache(DEFAULT_CACHE_NAME).query(new 
ScanQuery<>()).getAll();
+                else
+                    cln.cache(DEFAULT_CACHE_NAME).get(KEY);
+            }
+        }, 1, "touch");
+
+        IgniteInternalFuture<?> cpFut = runMultiThreadedAsync(() -> {
+            for (int i = 0; i < 1_000; i++) {
+                try {
+                    forceCheckpoint(F.asList(grid(0), grid(1)));
+                }
+                catch (Exception e) {
+                    // No-op.
+                }
+            }
+
+            stop.set(true);
+        }, 1, "checkpoint");
+
+        GridTestUtils.waitForAllFutures(updateFut, touchFut, cpFut);
+    }
+}
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite10.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite10.java
index 27dbaeccad0..826716fb9dc 100755
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite10.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite10.java
@@ -44,6 +44,7 @@ import 
org.apache.ignite.internal.processors.cache.CacheDeferredDeleteQueueTest;
 import 
org.apache.ignite.internal.processors.cache.CacheDeferredDeleteSanitySelfTest;
 import 
org.apache.ignite.internal.processors.cache.CachePutEventListenerErrorSelfTest;
 import org.apache.ignite.internal.processors.cache.CacheTxFastFinishTest;
+import 
org.apache.ignite.internal.processors.cache.ConcurrentCheckpointAndUpdateTtlTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheAtomicUsersAffinityMapperSelfTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheClearLocallySelfTest;
 import 
org.apache.ignite.internal.processors.cache.GridCacheColocatedTxStoreExceptionSelfTest;
@@ -204,6 +205,7 @@ public class IgniteCacheTestSuite10 {
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheReturnValueTransferSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, GridCacheSlowTxWarnTest.class, 
ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheTtlManagerLoadTest.class, ignoredTests);
+        GridTestUtils.addTestIfNeeded(suite, 
ConcurrentCheckpointAndUpdateTtlTest.class, ignoredTests);
 
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheAtomicUsersAffinityMapperSelfTest.class, ignoredTests);
         GridTestUtils.addTestIfNeeded(suite, 
GridCacheTxUsersAffinityMapperSelfTest.class, ignoredTests);

Reply via email to