This is an automated email from the ASF dual-hosted git repository. sergeychugunov pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push: new 7498164 IGNITE-13444 Durable tasks are cancelled on grid deactivation, starting new tasks is prohibited - Fixes #8244. 7498164 is described below commit 749816481149b9a54e9b780ea2289aafd1eeecef Author: makedonskaya <m.a.makedonsk...@gmail.com> AuthorDate: Tue Apr 20 11:25:28 2021 +0300 IGNITE-13444 Durable tasks are cancelled on grid deactivation, starting new tasks is prohibited - Fixes #8244. Signed-off-by: Sergey Chugunov <sergey.chugu...@gmail.com> --- .../cluster/GridClusterStateProcessor.java | 2 + .../localtask/DurableBackgroundTasksProcessor.java | 59 ++++++++++++++---- .../db/LongDestroyDurableBackgroundTaskTest.java | 70 +++++++++++++++++++--- 3 files changed, 112 insertions(+), 19 deletions(-) diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java index a774d520..73afdf9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cluster/GridClusterStateProcessor.java @@ -729,6 +729,8 @@ public class GridClusterStateProcessor extends GridProcessorAdapter implements I nodeIds ); + ctx.durableBackgroundTasksProcessor().onStateChange(msg); + if (msg.forceChangeBaselineTopology()) newState.setTransitionResult(msg.requestId(), msg.state()); diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java index c703df4..dfbab72 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.cluster.ClusterState; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.internal.GridKernalContext; import org.apache.ignite.internal.client.util.GridConcurrentHashSet; @@ -34,6 +35,7 @@ import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadO import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage; import org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask; import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage; +import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.worker.GridWorker; import org.apache.ignite.thread.IgniteThread; @@ -64,6 +66,16 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem /** Durable background tasks map. */ private final ConcurrentHashMap<String, DurableBackgroundTask> durableBackgroundTasks = new ConcurrentHashMap<>(); + /** Set of started tasks' names. */ + private final Set<String> startedTasks = new GridConcurrentHashSet<>(); + + /** + * Ban to start new tasks. The first time the cluster is activated, it will try again to run existing tasks. + * + * @see #onStateChangeFinish(ChangeGlobalStateFinishMessage) + */ + private volatile boolean forbidStartingNewTasks; + /** * @param ctx Kernal context. */ @@ -78,22 +90,31 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem assert durableBackgroundTasks != null; for (DurableBackgroundTask task : durableBackgroundTasks.values()) { - if (!task.isCompleted()) - asyncDurableBackgroundTaskExecute(task, false); + if (!task.isCompleted() && startedTasks.add(task.shortName())) + asyncDurableBackgroundTaskExecute(task); } } /** * Creates a worker to execute single durable background task. + * * @param task Task. - * @param dropTaskIfFailed Whether to delete task from metastorage, if it has failed. */ - private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task, boolean dropTaskIfFailed) { + private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task) { String workerName = "async-durable-background-task-executor-" + asyncDurableBackgroundTasksWorkersCntr.getAndIncrement(); GridWorker worker = new GridWorker(ctx.igniteInstanceName(), workerName, log) { + @Override public void cancel() { + task.onCancel(); + + super.cancel(); + } + @Override protected void body() { try { + if (forbidStartingNewTasks) + return; + log.info("Executing durable background task: " + task.shortName()); task.execute(ctx); @@ -104,11 +125,10 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem } catch (Throwable e) { log.error("Could not execute durable background task: " + task.shortName(), e); - - if (dropTaskIfFailed) - removeDurableBackgroundTask(task); } finally { + startedTasks.remove(task.shortName()); + asyncDurableBackgroundTaskWorkers.remove(this); } } @@ -128,8 +148,9 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem /** {@inheritDoc} */ @Override public void onKernalStop(boolean cancel) { - // Waiting for workers, but not cancelling them, trying to complete running tasks. - awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, false, log); + forbidStartingNewTasks = true; + + awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log); } /** {@inheritDoc} */ @@ -140,9 +161,23 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem /** * @param msg Message. */ - public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) { - if (!msg.clusterActive()) + public void onStateChange(ChangeGlobalStateMessage msg) { + if (msg.state() == ClusterState.INACTIVE) { + forbidStartingNewTasks = true; + awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log); + } + } + + /** + * @param msg Message. + */ + public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) { + if (msg.state() != ClusterState.INACTIVE) { + forbidStartingNewTasks = false; + + asyncDurableBackgroundTasksExecution(); + } } /** {@inheritDoc} */ @@ -267,7 +302,7 @@ public class DurableBackgroundTasksProcessor extends GridProcessorAdapter implem if (CU.isPersistentCache(ccfg, ctx.config().getDataStorageConfiguration())) addDurableBackgroundTask(task); - asyncDurableBackgroundTaskExecute(task, false); + asyncDurableBackgroundTaskExecute(task); } /** {@inheritDoc} */ diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java index 5c4cb04..5fdfa62 100644 --- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java +++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/LongDestroyDurableBackgroundTaskTest.java @@ -47,6 +47,7 @@ import org.apache.ignite.configuration.ClientConfiguration; import org.apache.ignite.configuration.DataRegionConfiguration; import org.apache.ignite.configuration.DataStorageConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.failure.StopNodeFailureHandler; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.cache.query.index.sorted.IndexKeyTypeSettings; import org.apache.ignite.internal.cache.query.index.sorted.IndexRow; @@ -163,19 +164,30 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest /** */ @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { return super.getConfiguration(igniteInstanceName) + .setFailureHandler(new StopNodeFailureHandler()) .setDataStorageConfiguration( - new DataStorageConfiguration().setDefaultDataRegionConfiguration( - new DataRegionConfiguration() - .setPersistenceEnabled(true) - .setInitialSize(10 * 1024L * 1024L) - .setMaxSize(50 * 1024L * 1024L) - ) - .setCheckpointFrequency(Long.MAX_VALUE / 2) + new DataStorageConfiguration() + .setDefaultDataRegionConfiguration( + new DataRegionConfiguration() + .setPersistenceEnabled(true) + .setInitialSize(10 * 1024L * 1024L) + .setMaxSize(50 * 1024L * 1024L) + ) + .setDataRegionConfigurations( + new DataRegionConfiguration() + .setName("dr1") + .setPersistenceEnabled(false) + ) + .setCheckpointFrequency(Long.MAX_VALUE / 2) ) .setCacheConfiguration( new CacheConfiguration(DEFAULT_CACHE_NAME) .setBackups(1) + .setSqlSchema("PUBLIC"), + new CacheConfiguration<Integer, Integer>("TEST") .setSqlSchema("PUBLIC") + .setBackups(1) + .setDataRegionName("dr1") ) .setGridLogger(testLog); } @@ -542,6 +554,50 @@ public class LongDestroyDurableBackgroundTaskTest extends GridCommonAbstractTest } /** + * Test case when cluster deactivation happens with no-persistence cache. Index tree deletion task should not be + * started after stopping cache. + * + * @throws Exception If failed. + */ + @Test + public void testClusterDeactivationShouldPassWithoutErrors() throws Exception { + IgniteEx ignite = startGrids(NODES_COUNT); + + ignite.cluster().active(true); + + IgniteCache<Integer, Integer> cache = ignite.cache("TEST"); + + query(cache, "create table TEST (id integer primary key, p integer, f integer) with " + + "\"DATA_REGION=dr1\""); + + query(cache, "create index TEST_IDX on TEST (p)"); + + for (int i = 0; i < 5_000; i++) + query(cache, "insert into TEST (id, p, f) values (?, ?, ?)", i, i, i); + + LogListener lsnr = LogListener.matches("Could not execute durable background task").build(); + LogListener lsnr2 = LogListener.matches("Executing durable background task").build(); + LogListener lsnr3 = LogListener.matches("Execution of durable background task completed").build(); + + testLog.registerAllListeners(lsnr, lsnr2, lsnr3); + + ignite.cluster().active(false); + + doSleep(1_000); + + assertFalse(lsnr.check()); + assertFalse(lsnr2.check()); + assertFalse(lsnr3.check()); + + testLog.unregisterListener(lsnr); + testLog.unregisterListener(lsnr2); + testLog.unregisterListener(lsnr3); + + for (int i = 0; i < NODES_COUNT; i++) + grid(i); + } + + /** * Tests case when long index deletion operation happens. Checkpoint should run in the middle of index deletion * operation. *