This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 04ae61f IGNITE-13382 DurableBackgroundTask can abandon incomplete task - Fixes #8182. 04ae61f is described below commit 04ae61f668b65a6f7881f090c9481cdf3ccad25c Author: makedonskaya <m.a.makedonsk...@gmail.com> AuthorDate: Tue Sep 1 17:11:07 2020 +0300 IGNITE-13382 DurableBackgroundTask can abandon incomplete task - Fixes #8182. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../pendingtask/DurableBackgroundTask.java | 17 ++++ .../localtask/DurableBackgroundTasksProcessor.java | 35 ++++++- .../h2/DurableBackgroundCleanupIndexTreeTask.java | 19 ++++ .../db/LongDestroyDurableBackgroundTaskTest.java | 105 ++++++++++++++++++++- 4 files changed, 168 insertions(+), 8 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java index a9e8d9e..355f624 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/metastorage/pendingtask/DurableBackgroundTask.java @@ -38,4 +38,21 @@ public interface DurableBackgroundTask extends Serializable { * @param ctx Grid kernal context. */ public void execute(GridKernalContext ctx); + + /** + * Method that marks task as complete. + */ + public void complete(); + + /** + * Method that return completion flag. + * + * @return flag that task completed. + */ + public boolean isCompleted(); + + /** + * Callback for task cancellation. + */ + public void onCancel(); } diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java index 05d82bf..559edb8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java @@ -26,6 +26,8 @@ import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.client.util.GridConcurrentHashSet; import org.apache.ignite.internal.processors.GridProcessorAdapter; +import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener; import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree; import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; @@ -42,7 +44,8 @@ import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop; * Processor that is responsible for durable background tasks that are executed on local node * and should be continued even after node restart. */ -public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener { +public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implements MetastorageLifecycleListener, + DbCheckpointListener { /** Prefix for metastorage keys for durable background tasks. */ private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-"; @@ -74,8 +77,10 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem private void asyncDurableBackgroundTasksExecution() { assert durableBackgroundTasks != null; - for (DurableBackgroundTask task : durableBackgroundTasks.values()) - asyncDurableBackgroundTaskExecute(task, false); + for (DurableBackgroundTask task : durableBackgroundTasks.values()) { + if (!task.isCompleted()) + asyncDurableBackgroundTaskExecute(task, false); + } } /** @@ -93,9 +98,9 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem task.execute(ctx); - log.info("Execution of durable background task completed: " + task.shortName()); + task.complete(); - removeDurableBackgroundTask(task); + log.info("Execution of durable background task completed: " + task.shortName()); } catch (Throwable e) { log.error("Could not execute durable background task: " + task.shortName(), e); @@ -172,6 +177,8 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem } } + ((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this); + this.metastorage = metastorage; } @@ -262,4 +269,22 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem asyncDurableBackgroundTaskExecute(task, false); } + + /** {@inheritDoc} */ + @Override public void onMarkCheckpointBegin(Context ctx) { + for (DurableBackgroundTask task : durableBackgroundTasks.values()) { + if (task.isCompleted()) + removeDurableBackgroundTask(task); + } + } + + /** {@inheritDoc} */ + @Override public void onCheckpointBegin(Context ctx) { + /* No op. */ + } + + /** {@inheritDoc} */ + @Override public void beforeCheckpointBegin(Context ctx) { + /* No op. */ + } } diff --git a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DurableBackgroundCleanupIndexTreeTask.java b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DurableBackgroundCleanupIndexTreeTask.java index ba40a8c..23e9af7 100644 --- a/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DurableBackgroundCleanupIndexTreeTask.java +++ b/modules/indexing/src/main/java/org/apache/ignite/internal/processors/query/h2/DurableBackgroundCleanupIndexTreeTask.java @@ -48,6 +48,9 @@ public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundT private transient List<H2Tree> trees; /** */ + private transient volatile boolean completed; + + /** */ private String cacheGrpName; /** */ @@ -73,6 +76,7 @@ public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundT ) { this.rootPages = rootPages; this.trees = trees; + this.completed = false; this.cacheGrpName = cacheGrpName; this.cacheName = cacheName; this.schemaName = schemaName; @@ -168,6 +172,21 @@ public class DurableBackgroundCleanupIndexTreeTask implements DurableBackgroundT } /** {@inheritDoc} */ + @Override public void complete() { + completed = true; + } + + /** {@inheritDoc} */ + @Override public boolean isCompleted() { + return completed; + } + + /** {@inheritDoc} */ + @Override public void onCancel() { + trees = null; + } + + /** {@inheritDoc} */ @Override public String toString() { return S.toString(DurableBackgroundCleanupIndexTreeTask.class, this); } diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java index c847031..fe84b78 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java @@ -16,6 +16,7 @@ */ package org.apache.ignite.internal.processors.cache.persistence.db; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Deque; @@ -46,6 +47,12 @@ import org.apache.ignite.internal.metric.IoStatisticsHolder; import org.apache.ignite.internal.pagemem.PageMemory; import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.persistence.DbCheckpointListener; +import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; +import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.LongListReuseBag; import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; import org.apache.ignite.internal.processors.failure.FailureProcessor; @@ -144,6 +151,9 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest private H2TreeIndex.H2TreeFactory regularH2TreeFactory; /** */ + private DurableBackgroundTaskTestListener durableBackgroundTaskTestLsnr; + + /** */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { return super.getConfiguration(igniteInstanceName) .setDataStorageConfiguration( @@ -189,6 +199,8 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest cleanPersistenceDir(); + durableBackgroundTaskTestLsnr = null; + super.afterTest(); } @@ -217,7 +229,7 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest int nodeCnt = NODES_COUNT; - Ignite ignite = prepareAndPopulateCluster(nodeCnt, multicolumn); + Ignite ignite = prepareAndPopulateCluster(nodeCnt, multicolumn, false); Ignite aliveNode = grid(ALWAYS_ALIVE_NODE_NUM); @@ -428,9 +440,18 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest * @return Ignite instance. * @throws Exception If failed. */ - private IgniteEx prepareAndPopulateCluster(int nodeCnt, boolean multicolumn) throws Exception { + private IgniteEx prepareAndPopulateCluster(int nodeCnt, boolean multicolumn, boolean createLsnr) throws Exception { IgniteEx ignite = startGrids(nodeCnt); + if (createLsnr) { + GridCacheSharedContext ctx = ignite.context().cache().context(); + + durableBackgroundTaskTestLsnr = new DurableBackgroundTaskTestListener(ctx.database().metaStorage()); + + ((GridCacheDatabaseSharedManager) ctx.cache().context().database()) + .addCheckpointListener(durableBackgroundTaskTestLsnr); + } + ignite.cluster().active(true); ignite.cluster().baselineAutoAdjustEnabled(false); @@ -538,7 +559,7 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest public void testDestroyTaskLifecycle() throws Exception { taskLifecycleListener.reset(); - IgniteEx ignite = prepareAndPopulateCluster(1, false); + IgniteEx ignite = prepareAndPopulateCluster(1, false, false); IgniteCache<Integer, Integer> cache = ignite.cache(DEFAULT_CACHE_NAME); @@ -564,6 +585,22 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest } /** + * Tests that task removed from metastorage in beginning of next checkpoint. + * + * @throws Exception If failed. + */ + @Test + public void testIndexDeletionTaskRemovedAfterCheckpointFinished() throws Exception { + prepareAndPopulateCluster(1, false, true); + + awaitLatch(pendingDelLatch, "Test timed out: failed to await for durable background task completion."); + + forceCheckpoint(); + + assertTrue(durableBackgroundTaskTestLsnr.check()); + } + + /** * */ private class H2TreeTest extends H2Tree { @@ -677,4 +714,66 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest return super.destroyDownPages(bag, pageId, lvl, c, lockHoldStartTime, lockMaxTime, lockedPages); } } + + /** + * + */ + private class DurableBackgroundTaskTestListener implements DbCheckpointListener { + /** + * Prefix for metastorage keys for durable background tasks. + */ + private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX = "durable-background-task-"; + + /** + * Metastorage. + */ + private volatile ReadOnlyMetastorage metastorage; + + /** + * Task keys in metastorage. + */ + private List<String> savedTasks = new ArrayList<>(); + + /** */ + public DurableBackgroundTaskTestListener(ReadWriteMetastorage metastorage) { + this.metastorage = metastorage; + } + + /** + * Checks that saved tasks from before checkpoint begin step removed from metastorage. + * Сall after the end of the checkpoint. + * + * @return true if check is successful. + */ + public boolean check() throws IgniteCheckedException { + if (savedTasks.isEmpty()) + return false; + + for (String taskKey : savedTasks) { + DurableBackgroundTask task = (DurableBackgroundTask)metastorage.read(taskKey); + + if (task != null) + return false; + } + + return true; + } + + /** {@inheritDoc} */ + @Override public void onMarkCheckpointBegin(Context ctx) { + /* No op. */ + } + + /** {@inheritDoc} */ + @Override public void onCheckpointBegin(Context ctx) { + /* No op. */ + } + + /** {@inheritDoc} */ + @Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException { + metastorage.iterate(STORE_DURABLE_BACKGROUND_TASK_PREFIX, + (key, val) -> savedTasks.add(key), + true); + } + } }