ignite-4851 : Partition will be reserved before new entry is created.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c61d1387
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c61d1387
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c61d1387

Branch: refs/heads/ignite-3477-master
Commit: c61d13875de9e199ebe68145dabd6a88544d7673
Parents: 17bc34d
Author: Ilya Lantukh <[email protected]>
Authored: Wed Apr 5 13:39:58 2017 +0300
Committer: Ilya Lantukh <[email protected]>
Committed: Wed Apr 5 13:39:58 2017 +0300

----------------------------------------------------------------------
 .../cache/GridCacheConcurrentMap.java           |   5 +-
 .../cache/GridCacheConcurrentMapImpl.java       | 192 +++++++++++--------
 .../dht/GridCachePartitionedConcurrentMap.java  |  15 +-
 .../distributed/dht/GridDhtLocalPartition.java  |  12 --
 4 files changed, 130 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/c61d1387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
index debc65b..a6738f3 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java
@@ -40,8 +40,9 @@ public interface GridCacheConcurrentMap {
      * @param key Key.
      * @param val Value.
      * @param create Create flag.
-     * @return Triple where the first element is current entry associated with 
the key,
-     *      the second is created entry and the third is doomed (all may be 
null).
+     * @return Existing or new GridCacheMapEntry. Will return {@code null} if 
entry is obsolete or absent and create
+     * flag is set to {@code false}. Will also return {@code null} if create 
flag is set to {@code true}, but entry
+     * couldn't be created.
      */
     @Nullable public GridCacheMapEntry putEntryIfObsoleteOrAbsent(
         AffinityTopologyVersion topVer,

http://git-wip-us.apache.org/repos/asf/ignite/blob/c61d1387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
index c1dbd0c..15a688b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgnitePredicate;
@@ -118,104 +117,145 @@ public abstract class GridCacheConcurrentMapImpl 
implements GridCacheConcurrentM
 
         boolean done = false;
 
-        while (!done) {
-            GridCacheMapEntry entry = map.get(key);
-            created = null;
-            doomed = null;
+        boolean reserved = false;
 
-            if (entry == null) {
-                if (create) {
-                    if (created0 == null)
-                        created0 = factory.create(ctx, topVer, key, 
key.hashCode(), val);
+        try {
+            while (!done) {
+                GridCacheMapEntry entry = map.get(key);
+                created = null;
+                doomed = null;
 
-                    cur = created = created0;
+                if (entry == null) {
+                    if (create) {
+                        if (created0 == null) {
+                            if (!reserved) {
+                                if (!reserve())
+                                    return null;
 
-                    done = map.putIfAbsent(created.key(), created) == null;
-                }
-                else
-                    done = true;
-            }
-            else {
-                if (entry.obsolete()) {
-                    doomed = entry;
+                                reserved = true;
+                            }
 
-                    if (create) {
-                        if (created0 == null)
                             created0 = factory.create(ctx, topVer, key, 
key.hashCode(), val);
+                        }
 
                         cur = created = created0;
 
-                        done = map.replace(entry.key(), doomed, created);
+                        done = map.putIfAbsent(created.key(), created) == null;
                     }
                     else
-                        done = map.remove(entry.key(), doomed);
+                        done = true;
                 }
                 else {
-                    cur = entry;
+                    if (entry.obsolete()) {
+                        doomed = entry;
+
+                        if (create) {
+                            if (created0 == null) {
+                                if (!reserved) {
+                                    if (!reserve())
+                                        return null;
+
+                                    reserved = true;
+                                }
+
+                                created0 = factory.create(ctx, topVer, key, 
key.hashCode(), val);
+                            }
 
-                    done = true;
+                            cur = created = created0;
+
+                            done = map.replace(entry.key(), doomed, created);
+                        }
+                        else
+                            done = map.remove(entry.key(), doomed);
+                    }
+                    else {
+                        cur = entry;
+
+                        done = true;
+                    }
                 }
             }
-        }
 
-        int sizeChange = 0;
+            int sizeChange = 0;
+
+            if (doomed != null) {
+                synchronized (doomed) {
+                    if (!doomed.deleted())
+                        sizeChange--;
+                }
 
-        if (doomed != null) {
-            synchronized (doomed) {
-                if (!doomed.deleted())
-                    sizeChange--;
+                if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED))
+                    ctx.events().addEvent(doomed.partition(),
+                        doomed.key(),
+                        ctx.localNodeId(),
+                        (IgniteUuid)null,
+                        null,
+                        EVT_CACHE_ENTRY_DESTROYED,
+                        null,
+                        false,
+                        null,
+                        false,
+                        null,
+                        null,
+                        null,
+                        true);
             }
 
-            if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED))
-                ctx.events().addEvent(doomed.partition(),
-                    doomed.key(),
-                    ctx.localNodeId(),
-                    (IgniteUuid)null,
-                    null,
-                    EVT_CACHE_ENTRY_DESTROYED,
-                    null,
-                    false,
-                    null,
-                    false,
-                    null,
-                    null,
-                    null,
-                    true);
-        }
+            if (created != null) {
+                sizeChange++;
+
+                if (ctx.events().isRecordable(EVT_CACHE_ENTRY_CREATED))
+                    ctx.events().addEvent(created.partition(),
+                        created.key(),
+                        ctx.localNodeId(),
+                        (IgniteUuid)null,
+                        null,
+                        EVT_CACHE_ENTRY_CREATED,
+                        null,
+                        false,
+                        null,
+                        false,
+                        null,
+                        null,
+                        null,
+                        true);
+
+                if (touch)
+                    ctx.evicts().touch(
+                        cur,
+                        topVer);
+            }
 
-        if (created != null) {
-            sizeChange++;
-
-            if (ctx.events().isRecordable(EVT_CACHE_ENTRY_CREATED))
-                ctx.events().addEvent(created.partition(),
-                    created.key(),
-                    ctx.localNodeId(),
-                    (IgniteUuid)null,
-                    null,
-                    EVT_CACHE_ENTRY_CREATED,
-                    null,
-                    false,
-                    null,
-                    false,
-                    null,
-                    null,
-                    null,
-                    true);
-
-            if (touch)
-                ctx.evicts().touch(
-                    cur,
-                    topVer);
-        }
+            assert Math.abs(sizeChange) <= 1;
+
+            if (sizeChange == -1)
+                decrementPublicSize(cur);
+            else if (sizeChange == 1) {
+                assert reserved;
 
-        assert Math.abs(sizeChange) <= 1;
+                incrementPublicSize(cur);
+            }
+
+            return cur;
+        }
+        finally {
+            if (reserved)
+                release();
+        }
+    }
 
-        if (sizeChange == -1)
-            decrementPublicSize(cur);
-        else if (sizeChange == 1)
-            incrementPublicSize(cur);
+    /**
+     *
+     */
+    protected boolean reserve() {
+        return true;
+    }
 
-        return cur;
+    /**
+     *
+     */
+    protected void release() {
+        // No-op.
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c61d1387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
index cfbe9bb..6230c85 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java
@@ -81,12 +81,19 @@ public class GridCachePartitionedConcurrentMap implements 
GridCacheConcurrentMap
     /** {@inheritDoc} */
     @Override public GridCacheMapEntry 
putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key,
         @Nullable CacheObject val, boolean create, boolean touch) {
-        GridDhtLocalPartition part = localPartition(key, topVer, create);
+        while (true) {
+            GridDhtLocalPartition part = localPartition(key, topVer, create);
 
-        if (part == null)
-            return null;
+            if (part == null)
+                return null;
+
+            GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, 
key, val, create, touch);
 
-        return part.putEntryIfObsoleteOrAbsent(topVer, key, val, create, 
touch);
+            if (res != null || !create)
+                return res;
+
+            // Otherwise parttion was concurrently evicted and should be 
re-created on next iteration.
+        }
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/c61d1387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
index d3ec2af..cd69494 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java
@@ -20,7 +20,6 @@ package 
org.apache.ignite.internal.processors.cache.distributed.dht;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CopyOnWriteArrayList;
@@ -32,13 +31,9 @@ import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.IgniteInterruptedCheckedException;
 import org.apache.ignite.internal.NodeStoppingException;
 import 
org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
 import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CacheObject;
-import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap;
 import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
 import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
@@ -49,20 +44,15 @@ import 
org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
 import 
org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
-import org.apache.ignite.internal.util.GridCircularBuffer;
 import org.apache.ignite.internal.processors.query.QueryUtils;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
 import org.apache.ignite.internal.util.lang.GridIterator;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.CI1;
-import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentLinkedDeque8;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
@@ -983,8 +973,6 @@ public class GridDhtLocalPartition extends 
GridCacheConcurrentMapImpl implements
         while (true) {
             long state = this.state.get();
 
-            assert getPartState(state) != EVICTED;
-
             if (this.state.compareAndSet(state, setSize(state, getSize(state) 
+ 1)))
                 return;
         }

Reply via email to