IGNITE-8768 Fixed JVM crash caused by an in-progress partition eviction during 
cache stop - Fixes #4227.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/56975c26
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/56975c26
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/56975c26

Branch: refs/heads/ignite-8446
Commit: 56975c266e7019f307bb9da42333a6db4e47365e
Parents: 16a7c98
Author: Pavel Kovalenko <jokse...@gmail.com>
Authored: Tue Jun 26 12:23:33 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Tue Jun 26 12:23:33 2018 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheGroupContext.java     |   2 +
 .../cache/distributed/dht/EvictionContext.java  |  28 ++
 .../distributed/dht/GridDhtLocalPartition.java  |  14 +-
 .../dht/GridDhtPartitionsEvictor.java           | 326 ++++++++++++++-----
 4 files changed, 286 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/56975c26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
index 8a65038..99f9f97 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheGroupContext.java
@@ -723,6 +723,8 @@ public class CacheGroupContext {
         IgniteCheckedException err =
             new IgniteCheckedException("Failed to wait for topology update, 
cache (or node) is stopping.");
 
+        evictor.stop();
+
         aff.cancelFutures(err);
 
         preldr.onKernalStop();

http://git-wip-us.apache.org/repos/asf/ignite/blob/56975c26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/EvictionContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/EvictionContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/EvictionContext.java
new file mode 100644
index 0000000..0964c3c
--- /dev/null
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/EvictionContext.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.dht;
+
+/**
+ * Additional context for partition eviction process.
+ */
+public interface EvictionContext {
+    /**
+     * @return {@code true} If eviction process should be stopped.
+     */
+    public boolean shouldStop();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/56975c26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index 81249b6..e13c952 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -850,14 +850,14 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
     }
 
     /**
-     * Tries to start partition clear process {@link 
GridDhtLocalPartition#clearAll()}).
+     * Tries to start partition clear process {@link 
GridDhtLocalPartition#clearAll(EvictionContext)}).
      * Only one thread is allowed to do such process concurrently.
      * At the end of clearing method completes {@code clearFuture}.
      *
      * @return {@code false} if clearing is not started due to existing 
reservations.
      * @throws NodeStoppingException If node is stopping.
      */
-    public boolean tryClear() throws NodeStoppingException {
+    public boolean tryClear(EvictionContext evictionCtx) throws 
NodeStoppingException {
         if (clearFuture.isDone())
             return true;
 
@@ -869,7 +869,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
         if (addEvicting()) {
             try {
                 // Attempt to evict partition entries from cache.
-                long clearedEntities = clearAll();
+                long clearedEntities = clearAll(evictionCtx);
 
                 if (log.isDebugEnabled())
                     log.debug("Partition is cleared [clearedEntities=" + 
clearedEntities + ", part=" + this + "]");
@@ -985,7 +985,7 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
      * @return Number of rows cleared from page memory.
      * @throws NodeStoppingException If node stopping.
      */
-    private long clearAll() throws NodeStoppingException {
+    private long clearAll(EvictionContext evictionCtx) throws 
NodeStoppingException {
         GridCacheVersion clearVer = ctx.versions().next();
 
         GridCacheObsoleteEntryExtras extras = new 
GridCacheObsoleteEntryExtras(clearVer);
@@ -1001,6 +1001,8 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
 
         long cleared = 0;
 
+        final int stopCheckingFreq = 1000;
+
         CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
 
         try {
@@ -1051,6 +1053,10 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
 
                         cleared++;
                     }
+
+                    // For each 'stopCheckingFreq' cleared entities check 
clearing process to stop.
+                    if (cleared % stopCheckingFreq == 0 && 
evictionCtx.shouldStop())
+                        return cleared;
                 }
                 catch (GridDhtInvalidPartitionException e) {
                     assert isEmpty() && state() == EVICTED : "Invalid error 
[e=" + e + ", part=" + this + ']';

http://git-wip-us.apache.org/repos/asf/ignite/blob/56975c26/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java
index 2a28921..7206397 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionsEvictor.java
@@ -16,21 +16,32 @@
  */
 package org.apache.ignite.internal.processors.cache.distributed.dht;
 
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.function.Function;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteSystemProperties;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.cache.CacheGroupContext;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
-import org.apache.ignite.internal.util.typedef.internal.GPC;
+import org.apache.ignite.internal.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.typedef.internal.LT;
 import org.apache.ignite.internal.util.typedef.internal.U;
 
 /**
- * Class that serves asynchronous partition eviction process.
+ * Class that serves asynchronous part eviction process.
+ * Only one partition from group can be evicted at the moment.
  */
 public class GridDhtPartitionsEvictor {
-    /** Show eviction progress frequency in ms. */
-    private static final int SHOW_EVICTION_PROGRESS_FREQ_MS = 2 * 60 * 1000; 
// 2 Minutes.
+    /** Default eviction progress show frequency. */
+    private static final int DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS = 2 * 60 * 
1000; // 2 Minutes.
+
+    /** Eviction progress frequency property name. */
+    private static final String SHOW_EVICTION_PROGRESS_FREQ = 
"SHOW_EVICTION_PROGRESS_FREQ";
 
     /** */
     private final GridCacheSharedContext<?, ?> ctx;
@@ -41,11 +52,31 @@ public class GridDhtPartitionsEvictor {
     /** */
     private final IgniteLogger log;
 
+    /** Lock object. */
+    private final Object mux = new Object();
+
     /** Queue contains partitions scheduled for eviction. */
-    private final ConcurrentHashMap<Integer, GridDhtLocalPartition> 
evictionQueue = new ConcurrentHashMap<>();
+    private final DeduplicationQueue<Integer, GridDhtLocalPartition> 
evictionQueue = new DeduplicationQueue<>(GridDhtLocalPartition::id);
+
+    /**
+     * Flag indicates that eviction process is running at the moment.
+     * This is needed to schedule partition eviction if there are no currently 
running self-scheduling eviction tasks.
+     * Guarded by {@link #mux}.
+     */
+    private boolean evictionRunning;
 
-    /** Flag indicates that eviction process is running at the moment, false 
in other case. */
-    private final AtomicBoolean evictionRunning = new AtomicBoolean();
+    /** Flag indicates that eviction process has stopped. */
+    private volatile boolean stop;
+
+    /** Future for currently running partition eviction task. */
+    private volatile GridFutureAdapter<Boolean> evictionFut;
+
+    /** Eviction progress frequency in ms. */
+    private final long evictionProgressFreqMs = 
IgniteSystemProperties.getLong(SHOW_EVICTION_PROGRESS_FREQ,
+        DEFAULT_SHOW_EVICTION_PROGRESS_FREQ_MS);
+
+    /** Next time of show eviction progress. */
+    private long nextShowProgressTime;
 
     /**
      * Constructor.
@@ -67,78 +98,213 @@ public class GridDhtPartitionsEvictor {
      * @param part Partition to evict.
      */
     public void evictPartitionAsync(GridDhtLocalPartition part) {
-        evictionQueue.putIfAbsent(part.id(), part);
-
-        if (evictionRunning.compareAndSet(false, true)) {
-            ctx.kernalContext().closure().callLocalSafe(new GPC<Boolean>() {
-                @Override public Boolean call() {
-                    boolean locked = true;
-
-                    long nextShowProgressTime = U.currentTimeMillis() + 
SHOW_EVICTION_PROGRESS_FREQ_MS;
-
-                    while (locked || !evictionQueue.isEmpty()) {
-                        if (!locked && !evictionRunning.compareAndSet(false, 
true))
-                            return false;
-
-                        try {
-                            for (GridDhtLocalPartition part : 
evictionQueue.values()) {
-                                // Show progress of currently evicting 
partitions.
-                                if (U.currentTimeMillis() >= 
nextShowProgressTime) {
-                                    if (log.isInfoEnabled())
-                                        log.info("Eviction in progress [grp=" 
+ grp.cacheOrGroupName()
-                                                + ", remainingCnt=" + 
evictionQueue.size() + "]");
-
-                                    nextShowProgressTime = 
U.currentTimeMillis() + SHOW_EVICTION_PROGRESS_FREQ_MS;
-                                }
-
-                                try {
-                                    boolean success = part.tryClear();
-
-                                    if (success) {
-                                        evictionQueue.remove(part.id());
-
-                                        if (part.state() == 
GridDhtPartitionState.EVICTED && part.markForDestroy())
-                                            part.destroy();
-                                    }
-                                }
-                                catch (Throwable ex) {
-                                    if (ctx.kernalContext().isStopping()) {
-                                        LT.warn(log, ex, "Partition eviction 
failed (current node is stopping).",
-                                                false,
-                                                true);
-
-                                        evictionQueue.clear();
-
-                                        return true;
-                                    }
-                                    else
-                                        LT.error(log, ex, "Partition eviction 
failed, this can cause grid hang.");
-                                }
-                            }
-                        }
-                        finally {
-                            if (!evictionQueue.isEmpty()) {
-                                if (ctx.kernalContext().isStopping()) {
-                                    evictionQueue.clear();
-
-                                    locked = false;
-                                }
-                                else
-                                    locked = true;
-                            }
-                            else {
-                                boolean res = 
evictionRunning.compareAndSet(true, false);
-
-                                assert res;
-
-                                locked = false;
-                            }
-                        }
-                    }
-
-                    return true;
+        if (stop)
+            return;
+
+        boolean added = evictionQueue.offer(part);
+
+        if (!added)
+            return;
+
+        synchronized (mux) {
+            if (!evictionRunning) {
+                nextShowProgressTime = U.currentTimeMillis() + 
evictionProgressFreqMs;
+
+                scheduleNextPartitionEviction();
+            }
+        }
+    }
+
+    /**
+     * Stops eviction process.
+     * Method awaits last offered partition eviction.
+     */
+    public void stop() {
+        stop = true;
+
+        synchronized (mux) {
+            // Wait for last offered partition eviction completion.
+            IgniteInternalFuture<Boolean> evictionFut0 = evictionFut;
+
+            if (evictionFut0 != null) {
+                try {
+                    evictionFut0.get();
+                }
+                catch (IgniteCheckedException e) {
+                    if (log.isDebugEnabled())
+                        log.warning("Failed to await partition eviction during 
stopping", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Gets next partition from the queue and schedules it for eviction.
+     */
+    private void scheduleNextPartitionEviction() {
+        if (stop)
+            return;
+
+        synchronized (mux) {
+            GridDhtLocalPartition next = evictionQueue.poll();
+
+            if (next != null) {
+                showProgress();
+
+                evictionFut = new GridFutureAdapter<>();
+
+                ctx.kernalContext().closure().callLocalSafe(new 
PartitionEvictionTask(next, () -> stop), true);
+            }
+            else
+                evictionRunning = false;
+        }
+    }
+
+    /**
+     * Shows progress of eviction.
+     */
+    private void showProgress() {
+        if (U.currentTimeMillis() >= nextShowProgressTime) {
+            int size = evictionQueue.size() + 1; // Queue size plus current 
partition.
+
+            if (log.isInfoEnabled())
+                log.info("Eviction in progress [grp=" + grp.cacheOrGroupName()
+                    + ", remainingPartsCnt=" + size + "]");
+
+            nextShowProgressTime = U.currentTimeMillis() + 
evictionProgressFreqMs;
+        }
+    }
+
+    /**
+     * Task for self-scheduled partition eviction / clearing.
+     */
+    private class PartitionEvictionTask implements Callable<Boolean> {
+        /** Partition to evict. */
+        private final GridDhtLocalPartition part;
+
+        /** Eviction context. */
+        private final EvictionContext evictionCtx;
+
+        /**
+         * @param part Partition.
+         * @param evictionCtx Eviction context.
+         */
+        public PartitionEvictionTask(GridDhtLocalPartition part, 
EvictionContext evictionCtx) {
+            this.part = part;
+            this.evictionCtx = evictionCtx;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Boolean call() throws Exception {
+            if (stop) {
+                evictionFut.onDone();
+
+                return false;
+            }
+
+            try {
+                boolean success = part.tryClear(evictionCtx);
+
+                if (success) {
+                    if (part.state() == GridDhtPartitionState.EVICTED && 
part.markForDestroy())
+                        part.destroy();
                 }
-            }, /*system pool*/ true);
+                else // Re-offer partition if clear was unsuccessful due to 
partition reservation.
+                    evictionQueue.offer(part);
+
+                // Complete eviction future before schedule new to prevent 
deadlock with
+                // simultaneous eviction stopping and scheduling new eviction.
+                evictionFut.onDone();
+
+                scheduleNextPartitionEviction();
+
+                return true;
+            }
+            catch (Throwable ex) {
+                evictionFut.onDone(ex);
+
+                if (ctx.kernalContext().isStopping()) {
+                    LT.warn(log, ex, "Partition eviction failed (current node 
is stopping).",
+                        false,
+                        true);
+                }
+                else
+                    LT.error(log, ex, "Partition eviction failed, this can 
cause grid hang.");
+            }
+
+            return false;
+        }
+    }
+
+    /**
+     * Thread-safe blocking queue with items deduplication.
+     *
+     * @param <K> Key type of item used for deduplication.
+     * @param <V> Queue item type.
+     */
+    private static class DeduplicationQueue<K, V> {
+        /** Queue. */
+        private final Queue<V> queue;
+
+        /** Unique items set. */
+        private final Set<K> uniqueItems;
+
+        /** Key mapping function. */
+        private final Function<V, K> keyMappingFunction;
+
+        /**
+         * Constructor.
+         *
+         * @param keyExtractor Function to extract a key from a queue item.
+         *                     This key is used for deduplication if some item 
has offered twice.
+         */
+        public DeduplicationQueue(Function<V, K> keyExtractor) {
+            keyMappingFunction = keyExtractor;
+            queue = new LinkedBlockingQueue<>();
+            uniqueItems = new GridConcurrentHashSet<>();
+        }
+
+        /**
+         * Offers item to the queue.
+         *
+         * @param item Item.
+         * @return {@code true} if item has been successfully offered to the 
queue,
+         *         {@code false} if item was rejected because already exists 
in the queue.
+         */
+        public boolean offer(V item) {
+            K key = keyMappingFunction.apply(item);
+
+            if (uniqueItems.add(key)) {
+                queue.offer(item);
+
+                return true;
+            }
+
+            return false;
+        }
+
+        /**
+         * Polls next item from queue.
+         *
+         * @return Next item or {@code null} if queue is empty.
+         */
+        public V poll() {
+            V item = queue.poll();
+
+            if (item != null) {
+                K key = keyMappingFunction.apply(item);
+
+                uniqueItems.remove(key);
+            }
+
+            return item;
+        }
+
+        /**
+         * @return Size of queue.
+         */
+        public int size() {
+            return queue.size();
         }
     }
 }

Reply via email to