This is an automated email from the ASF dual-hosted git repository.

sergeychugunov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 04ae61f  IGNITE-13382 DurableBackgroundTask can abandon incomplete 
task - Fixes #8182.
04ae61f is described below

commit 04ae61f668b65a6f7881f090c9481cdf3ccad25c
Author: makedonskaya <m.a.makedonsk...@gmail.com>
AuthorDate: Tue Sep 1 17:11:07 2020 +0300

    IGNITE-13382 DurableBackgroundTask can abandon incomplete task - Fixes 
#8182.
    
    Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com>
---
 .../pendingtask/DurableBackgroundTask.java         |  17 ++++
 .../localtask/DurableBackgroundTasksProcessor.java |  35 ++++++-
 .../h2/DurableBackgroundCleanupIndexTreeTask.java  |  19 ++++
 .../db/LongDestroyDurableBackgroundTaskTest.java   | 105 ++++++++++++++++++++-
 4 files changed, 168 insertions(+), 8 deletions(-)

diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
index a9e8d9e..355f624 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java
@@ -38,4 +38,21 @@ public interface DurableBackgroundTask extends Serializable {
      * @param ctx Grid kernal context.
      */
     public void execute(GridKernalContext ctx);
+
+    /**
+     * Method that marks task as complete.
+     */
+    public void complete();
+
+    /**
+     * Method that return completion flag.
+     *
+     * @return flag that task completed.
+     */
+    public boolean isCompleted();
+
+    /**
+     * Callback for task cancellation.
+     */
+    public void onCancel();
 }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
index 05d82bf..559edb8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
@@ -26,6 +26,8 @@ import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.processors.GridProcessorAdapter;
+import 
org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
 import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
 import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
@@ -42,7 +44,8 @@ import static 
org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
  * Processor that is responsible for durable background tasks that are 
executed on local node
  * and should be continued even after node restart.
  */
-public class DurableBackgroundTasksProcessor extends GridProcessorAdapter 
implements MetastorageLifecycleListener {
+public class DurableBackgroundTasksProcessor extends GridProcessorAdapter 
implements MetastorageLifecycleListener,
+        DbCheckpointListener {
     /** Prefix for metastorage keys for durable background tasks. */
     private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = 
"durable-background-task-";
 
@@ -74,8 +77,10 @@ public class DurableBackgroundTasksProcessor extends 
GridProcessorAdapter implem
     private void asyncDurableBackgroundTasksExecution() {
         assert durableBackgroundTasks != null;
 
-        for (DurableBackgroundTask task : durableBackgroundTasks.values())
-            asyncDurableBackgroundTaskExecute(task, false);
+        for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
+            if (!task.isCompleted())
+                asyncDurableBackgroundTaskExecute(task, false);
+        }
     }
 
     /**
@@ -93,9 +98,9 @@ public class DurableBackgroundTasksProcessor extends 
GridProcessorAdapter implem
 
                     task.execute(ctx);
 
-                    log.info("Execution of durable background task completed: 
" + task.shortName());
+                    task.complete();
 
-                    removeDurableBackgroundTask(task);
+                    log.info("Execution of durable background task completed: 
" + task.shortName());
                 }
                 catch (Throwable e) {
                     log.error("Could not execute durable background task: " + 
task.shortName(), e);
@@ -172,6 +177,8 @@ public class DurableBackgroundTasksProcessor extends 
GridProcessorAdapter implem
             }
         }
 
+        
((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+
         this.metastorage = metastorage;
     }
 
@@ -262,4 +269,22 @@ public class DurableBackgroundTasksProcessor extends 
GridProcessorAdapter implem
 
         asyncDurableBackgroundTaskExecute(task, false);
     }
+
+    /** {@inheritDoc} */
+    @Override public void onMarkCheckpointBegin(Context ctx) {
+        for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
+            if (task.isCompleted())
+                removeDurableBackgroundTask(task);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCheckpointBegin(Context ctx) {
+        /* No op. */
+    }
+
+    /** {@inheritDoc} */
+    @Override public void beforeCheckpointBegin(Context ctx) {
+        /* No op. */
+    }
 }
diff --git 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DurableBackgroundCleanupIndexTreeTask.java
 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DurableBackgroundCleanupIndexTreeTask.java
index ba40a8c..23e9af7 100644
--- 
a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DurableBackgroundCleanupIndexTreeTask.java
+++ 
b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DurableBackgroundCleanupIndexTreeTask.java
@@ -48,6 +48,9 @@ public class DurableBackgroundCleanupIndexTreeTask implements 
DurableBackgroundT
     private transient List<H2Tree> trees;
 
     /** */
+    private transient volatile boolean completed;
+
+    /** */
     private String cacheGrpName;
 
     /** */
@@ -73,6 +76,7 @@ public class DurableBackgroundCleanupIndexTreeTask implements 
DurableBackgroundT
     ) {
         this.rootPages = rootPages;
         this.trees = trees;
+        this.completed = false;
         this.cacheGrpName = cacheGrpName;
         this.cacheName = cacheName;
         this.schemaName = schemaName;
@@ -168,6 +172,21 @@ public class DurableBackgroundCleanupIndexTreeTask 
implements DurableBackgroundT
     }
 
     /** {@inheritDoc} */
+    @Override public void complete() {
+        completed = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isCompleted() {
+        return completed;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onCancel() {
+        trees = null;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(DurableBackgroundCleanupIndexTreeTask.class, this);
     }
diff --git 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
index c847031..fe84b78 100644
--- 
a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
+++ 
b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ignite.internal.processors.cache.persistence.db;
 
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
@@ -46,6 +47,12 @@ import org.apache.ignite.internal.metric.IoStatisticsHolder;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
+import 
org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener;
+import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
+import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
+import 
org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.LongListReuseBag;
 import 
org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
@@ -144,6 +151,9 @@ public class LongDestroyDurableBackgroundTaskTest extends 
GridCommonAbstractTest
     private H2TreeIndex.H2TreeFactory regularH2TreeFactory;
 
     /** */
+    private DurableBackgroundTaskTestListener durableBackgroundTaskTestLsnr;
+
+    /** */
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         return super.getConfiguration(igniteInstanceName)
             .setDataStorageConfiguration(
@@ -189,6 +199,8 @@ public class LongDestroyDurableBackgroundTaskTest extends 
GridCommonAbstractTest
 
         cleanPersistenceDir();
 
+        durableBackgroundTaskTestLsnr = null;
+
         super.afterTest();
     }
 
@@ -217,7 +229,7 @@ public class LongDestroyDurableBackgroundTaskTest extends 
GridCommonAbstractTest
 
         int nodeCnt = NODES_COUNT;
 
-        Ignite ignite = prepareAndPopulateCluster(nodeCnt, multicolumn);
+        Ignite ignite = prepareAndPopulateCluster(nodeCnt, multicolumn, false);
 
         Ignite aliveNode = grid(ALWAYS_ALIVE_NODE_NUM);
 
@@ -428,9 +440,18 @@ public class LongDestroyDurableBackgroundTaskTest extends 
GridCommonAbstractTest
      * @return Ignite instance.
      * @throws Exception If failed.
      */
-    private IgniteEx prepareAndPopulateCluster(int nodeCnt, boolean 
multicolumn) throws Exception {
+    private IgniteEx prepareAndPopulateCluster(int nodeCnt, boolean 
multicolumn, boolean createLsnr) throws Exception {
         IgniteEx ignite = startGrids(nodeCnt);
 
+        if (createLsnr) {
+            GridCacheSharedContext ctx = ignite.context().cache().context();
+
+            durableBackgroundTaskTestLsnr = new 
DurableBackgroundTaskTestListener(ctx.database().metaStorage());
+
+            ((GridCacheDatabaseSharedManager) ctx.cache().context().database())
+                    .addCheckpointListener(durableBackgroundTaskTestLsnr);
+        }
+
         ignite.cluster().active(true);
 
         ignite.cluster().baselineAutoAdjustEnabled(false);
@@ -538,7 +559,7 @@ public class LongDestroyDurableBackgroundTaskTest extends 
GridCommonAbstractTest
     public void testDestroyTaskLifecycle() throws Exception {
         taskLifecycleListener.reset();
 
-        IgniteEx ignite = prepareAndPopulateCluster(1, false);
+        IgniteEx ignite = prepareAndPopulateCluster(1, false, false);
 
         IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME);
 
@@ -564,6 +585,22 @@ public class LongDestroyDurableBackgroundTaskTest extends 
GridCommonAbstractTest
     }
 
     /**
+     * Tests that task removed from metastorage in beginning of next 
checkpoint.
+     *
+     * @throws Exception If failed.
+     */
+    @Test
+    public void testIndexDeletionTaskRemovedAfterCheckpointFinished() throws 
Exception {
+        prepareAndPopulateCluster(1, false, true);
+
+        awaitLatch(pendingDelLatch, "Test timed out: failed to await for 
durable background task completion.");
+
+        forceCheckpoint();
+
+        assertTrue(durableBackgroundTaskTestLsnr.check());
+    }
+
+    /**
      *
      */
     private class H2TreeTest extends H2Tree {
@@ -677,4 +714,66 @@ public class LongDestroyDurableBackgroundTaskTest extends 
GridCommonAbstractTest
             return super.destroyDownPages(bag, pageId, lvl, c, 
lockHoldStartTime, lockMaxTime, lockedPages);
         }
     }
+
+    /**
+     *
+     */
+    private class DurableBackgroundTaskTestListener implements 
DbCheckpointListener {
+        /**
+         * Prefix for metastorage keys for durable background tasks.
+         */
+        private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = 
"durable-background-task-";
+
+        /**
+         * Metastorage.
+         */
+        private volatile ReadOnlyMetastorage metastorage;
+
+        /**
+         * Task keys in metastorage.
+         */
+        private List<String> savedTasks = new ArrayList<>();
+
+        /** */
+        public DurableBackgroundTaskTestListener(ReadWriteMetastorage 
metastorage) {
+            this.metastorage = metastorage;
+        }
+
+        /**
+         * Checks that saved tasks from before checkpoint begin step removed 
from metastorage.
+         * Сall after the end of the checkpoint.
+         *
+         * @return true if check is successful.
+         */
+        public boolean check() throws IgniteCheckedException {
+            if (savedTasks.isEmpty())
+                return false;
+
+            for (String taskKey : savedTasks) {
+                DurableBackgroundTask task = 
(DurableBackgroundTask)metastorage.read(taskKey);
+
+                if (task != null)
+                    return false;
+            }
+
+            return true;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onMarkCheckpointBegin(Context ctx) {
+            /* No op. */
+        }
+
+        /** {@inheritDoc} */
+        @Override public void onCheckpointBegin(Context ctx) {
+            /* No op. */
+        }
+
+        /** {@inheritDoc} */
+        @Override public void beforeCheckpointBegin(Context ctx) throws 
IgniteCheckedException {
+            metastorage.iterate(STORE_DURABLE_BACKGROUND_TASK_PREFIX,
+                    (key, val) -> savedTasks.add(key),
+                    true);
+        }
+    }
 }

Reply via email to