IGNITE-8768 Fixed JVM crash caused by an in-progress partition eviction during cache stop - Fixes #4227.
Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56975c26 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56975c26 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56975c26 Branch: refs/heads/ignite-8446 Commit: 56975c266e7019f307bb9da42333a6db4e47365e Parents: 16a7c98 Author: Pavel Kovalenko <jokse...@gmail.com> Authored: Tue Jun 26 12:23:33 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Tue Jun 26 12:23:33 2018 +0300 ---------------------------------------------------------------------- .../processors/cache/CacheGroupContext.java | 2 + .../cache/distributed/dht/EvictionContext.java | 28 ++ .../distributed/dht/GridDhtLocalPartition.java | 14 +- .../dht/GridDhtPartitionsEvictor.java | 326 ++++++++++++++----- 4 files changed, 286 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/56975c26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java index 8a65038..99f9f97 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java @@ -723,6 +723,8 @@ public class CacheGroupContext { IgniteCheckedException err = new IgniteCheckedException("Failed to wait for topology update, cache (or node) is stopping."); + evictor.stop(); + aff.cancelFutures(err); preldr.onKernalStop(); http://git-wip-us.apache.org/repos/asf/ignite/blob/56975c26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/EvictionContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/EvictionContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/EvictionContext.java new file mode 100644 index 0000000..0964c3c --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/EvictionContext.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +/** + * Additional context for partition eviction process. + */ +public interface EvictionContext { + /** + * @return {@code true} If eviction process should be stopped. + */ + public boolean shouldStop(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/56975c26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 81249b6..e13c952 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -850,14 +850,14 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements } /** - * Tries to start partition clear process {@link GridDhtLocalPartition#clearAll()}). + * Tries to start partition clear process {@link GridDhtLocalPartition#clearAll(EvictionContext)}). * Only one thread is allowed to do such process concurrently. * At the end of clearing method completes {@code clearFuture}. * * @return {@code false} if clearing is not started due to existing reservations. * @throws NodeStoppingException If node is stopping. */ - public boolean tryClear() throws NodeStoppingException { + public boolean tryClear(EvictionContext evictionCtx) throws NodeStoppingException { if (clearFuture.isDone()) return true; @@ -869,7 +869,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements if (addEvicting()) { try { // Attempt to evict partition entries from cache. - long clearedEntities = clearAll(); + long clearedEntities = clearAll(evictionCtx); if (log.isDebugEnabled()) log.debug("Partition is cleared [clearedEntities=" + clearedEntities + ", part=" + this + "]"); @@ -985,7 +985,7 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements * @return Number of rows cleared from page memory. * @throws NodeStoppingException If node stopping. */ - private long clearAll() throws NodeStoppingException { + private long clearAll(EvictionContext evictionCtx) throws NodeStoppingException { GridCacheVersion clearVer = ctx.versions().next(); GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer); @@ -1001,6 +1001,8 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements long cleared = 0; + final int stopCheckingFreq = 1000; + CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; try { @@ -1051,6 +1053,10 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements cleared++; } + + // For each 'stopCheckingFreq' cleared entities check clearing process to stop. + if (cleared % stopCheckingFreq == 0 && evictionCtx.shouldStop()) + return cleared; } catch (GridDhtInvalidPartitionException e) { assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; http://git-wip-us.apache.org/repos/asf/ignite/blob/56975c26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java index 2a28921..7206397 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java @@ -16,21 +16,32 @@ */ package org.apache.ignite.internal.processors.cache.distributed.dht; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.function.Function; +import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.IgniteSystemProperties; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.CacheGroupContext; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; -import org.apache.ignite.internal.util.typedef.internal.GPC; +import org.apache.ignite.internal.util.GridConcurrentHashSet; +import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.typedef.internal.LT; import org.apache.ignite.internal.util.typedef.internal.U; /** - * Class that serves asynchronous partition eviction process. + * Class that serves asynchronous part eviction process. + * Only one partition from group can be evicted at the moment. */ public class GridDhtPartitionsEvictor { - /** Show eviction progress frequency in ms. */ - private static final int SHOW_EVICTION_PROGRESS_FREQ_MS = 2 * 60 * 1000; // 2 Minutes. + /** Default eviction progress show frequency. */ + private static final int DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS = 2 * 60 * 1000; // 2 Minutes. + + /** Eviction progress frequency property name. */ + private static final String SHOW_EVICTION_PROGRESS_FREQ = "SHOW_EVICTION_PROGRESS_FREQ"; /** */ private final GridCacheSharedContext<?, ?> ctx; @@ -41,11 +52,31 @@ public class GridDhtPartitionsEvictor { /** */ private final IgniteLogger log; + /** Lock object. */ + private final Object mux = new Object(); + /** Queue contains partitions scheduled for eviction. */ - private final ConcurrentHashMap<Integer, GridDhtLocalPartition> evictionQueue = new ConcurrentHashMap<>(); + private final DeduplicationQueue<Integer, GridDhtLocalPartition> evictionQueue = new DeduplicationQueue<>(GridDhtLocalPartition::id); + + /** + * Flag indicates that eviction process is running at the moment. + * This is needed to schedule partition eviction if there are no currently running self-scheduling eviction tasks. + * Guarded by {@link #mux}. + */ + private boolean evictionRunning; - /** Flag indicates that eviction process is running at the moment, false in other case. */ - private final AtomicBoolean evictionRunning = new AtomicBoolean(); + /** Flag indicates that eviction process has stopped. */ + private volatile boolean stop; + + /** Future for currently running partition eviction task. */ + private volatile GridFutureAdapter<Boolean> evictionFut; + + /** Eviction progress frequency in ms. */ + private final long evictionProgressFreqMs = IgniteSystemProperties.getLong(SHOW_EVICTION_PROGRESS_FREQ, + DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS); + + /** Next time of show eviction progress. */ + private long nextShowProgressTime; /** * Constructor. @@ -67,78 +98,213 @@ public class GridDhtPartitionsEvictor { * @param part Partition to evict. */ public void evictPartitionAsync(GridDhtLocalPartition part) { - evictionQueue.putIfAbsent(part.id(), part); - - if (evictionRunning.compareAndSet(false, true)) { - ctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() { - @Override public Boolean call() { - boolean locked = true; - - long nextShowProgressTime = U.currentTimeMillis() + SHOW_EVICTION_PROGRESS_FREQ_MS; - - while (locked || !evictionQueue.isEmpty()) { - if (!locked && !evictionRunning.compareAndSet(false, true)) - return false; - - try { - for (GridDhtLocalPartition part : evictionQueue.values()) { - // Show progress of currently evicting partitions. - if (U.currentTimeMillis() >= nextShowProgressTime) { - if (log.isInfoEnabled()) - log.info("Eviction in progress [grp=" + grp.cacheOrGroupName() - + ", remainingCnt=" + evictionQueue.size() + "]"); - - nextShowProgressTime = U.currentTimeMillis() + SHOW_EVICTION_PROGRESS_FREQ_MS; - } - - try { - boolean success = part.tryClear(); - - if (success) { - evictionQueue.remove(part.id()); - - if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy()) - part.destroy(); - } - } - catch (Throwable ex) { - if (ctx.kernalContext().isStopping()) { - LT.warn(log, ex, "Partition eviction failed (current node is stopping).", - false, - true); - - evictionQueue.clear(); - - return true; - } - else - LT.error(log, ex, "Partition eviction failed, this can cause grid hang."); - } - } - } - finally { - if (!evictionQueue.isEmpty()) { - if (ctx.kernalContext().isStopping()) { - evictionQueue.clear(); - - locked = false; - } - else - locked = true; - } - else { - boolean res = evictionRunning.compareAndSet(true, false); - - assert res; - - locked = false; - } - } - } - - return true; + if (stop) + return; + + boolean added = evictionQueue.offer(part); + + if (!added) + return; + + synchronized (mux) { + if (!evictionRunning) { + nextShowProgressTime = U.currentTimeMillis() + evictionProgressFreqMs; + + scheduleNextPartitionEviction(); + } + } + } + + /** + * Stops eviction process. + * Method awaits last offered partition eviction. + */ + public void stop() { + stop = true; + + synchronized (mux) { + // Wait for last offered partition eviction completion. + IgniteInternalFuture<Boolean> evictionFut0 = evictionFut; + + if (evictionFut0 != null) { + try { + evictionFut0.get(); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.warning("Failed to await partition eviction during stopping", e); + } + } + } + } + + /** + * Gets next partition from the queue and schedules it for eviction. + */ + private void scheduleNextPartitionEviction() { + if (stop) + return; + + synchronized (mux) { + GridDhtLocalPartition next = evictionQueue.poll(); + + if (next != null) { + showProgress(); + + evictionFut = new GridFutureAdapter<>(); + + ctx.kernalContext().closure().callLocalSafe(new PartitionEvictionTask(next, () -> stop), true); + } + else + evictionRunning = false; + } + } + + /** + * Shows progress of eviction. + */ + private void showProgress() { + if (U.currentTimeMillis() >= nextShowProgressTime) { + int size = evictionQueue.size() + 1; // Queue size plus current partition. + + if (log.isInfoEnabled()) + log.info("Eviction in progress [grp=" + grp.cacheOrGroupName() + + ", remainingPartsCnt=" + size + "]"); + + nextShowProgressTime = U.currentTimeMillis() + evictionProgressFreqMs; + } + } + + /** + * Task for self-scheduled partition eviction / clearing. + */ + private class PartitionEvictionTask implements Callable<Boolean> { + /** Partition to evict. */ + private final GridDhtLocalPartition part; + + /** Eviction context. */ + private final EvictionContext evictionCtx; + + /** + * @param part Partition. + * @param evictionCtx Eviction context. + */ + public PartitionEvictionTask(GridDhtLocalPartition part, EvictionContext evictionCtx) { + this.part = part; + this.evictionCtx = evictionCtx; + } + + /** {@inheritDoc} */ + @Override public Boolean call() throws Exception { + if (stop) { + evictionFut.onDone(); + + return false; + } + + try { + boolean success = part.tryClear(evictionCtx); + + if (success) { + if (part.state() == GridDhtPartitionState.EVICTED && part.markForDestroy()) + part.destroy(); } - }, /*system pool*/ true); + else // Re-offer partition if clear was unsuccessful due to partition reservation. + evictionQueue.offer(part); + + // Complete eviction future before schedule new to prevent deadlock with + // simultaneous eviction stopping and scheduling new eviction. + evictionFut.onDone(); + + scheduleNextPartitionEviction(); + + return true; + } + catch (Throwable ex) { + evictionFut.onDone(ex); + + if (ctx.kernalContext().isStopping()) { + LT.warn(log, ex, "Partition eviction failed (current node is stopping).", + false, + true); + } + else + LT.error(log, ex, "Partition eviction failed, this can cause grid hang."); + } + + return false; + } + } + + /** + * Thread-safe blocking queue with items deduplication. + * + * @param <K> Key type of item used for deduplication. + * @param <V> Queue item type. + */ + private static class DeduplicationQueue<K, V> { + /** Queue. */ + private final Queue<V> queue; + + /** Unique items set. */ + private final Set<K> uniqueItems; + + /** Key mapping function. */ + private final Function<V, K> keyMappingFunction; + + /** + * Constructor. + * + * @param keyExtractor Function to extract a key from a queue item. + * This key is used for deduplication if some item has offered twice. + */ + public DeduplicationQueue(Function<V, K> keyExtractor) { + keyMappingFunction = keyExtractor; + queue = new LinkedBlockingQueue<>(); + uniqueItems = new GridConcurrentHashSet<>(); + } + + /** + * Offers item to the queue. + * + * @param item Item. + * @return {@code true} if item has been successfully offered to the queue, + * {@code false} if item was rejected because already exists in the queue. + */ + public boolean offer(V item) { + K key = keyMappingFunction.apply(item); + + if (uniqueItems.add(key)) { + queue.offer(item); + + return true; + } + + return false; + } + + /** + * Polls next item from queue. + * + * @return Next item or {@code null} if queue is empty. + */ + public V poll() { + V item = queue.poll(); + + if (item != null) { + K key = keyMappingFunction.apply(item); + + uniqueItems.remove(key); + } + + return item; + } + + /** + * @return Size of queue. + */ + public int size() { + return queue.size(); } } }