ignite-4851 : Partition will be reserved before new entry is created.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/c61d1387 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/c61d1387 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/c61d1387 Branch: refs/heads/ignite-3477-master Commit: c61d13875de9e199ebe68145dabd6a88544d7673 Parents: 17bc34d Author: Ilya Lantukh <[email protected]> Authored: Wed Apr 5 13:39:58 2017 +0300 Committer: Ilya Lantukh <[email protected]> Committed: Wed Apr 5 13:39:58 2017 +0300 ---------------------------------------------------------------------- .../cache/GridCacheConcurrentMap.java | 5 +- .../cache/GridCacheConcurrentMapImpl.java | 192 +++++++++++-------- .../dht/GridCachePartitionedConcurrentMap.java | 15 +- .../distributed/dht/GridDhtLocalPartition.java | 12 -- 4 files changed, 130 insertions(+), 94 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/c61d1387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java index debc65b..a6738f3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java @@ -40,8 +40,9 @@ public interface GridCacheConcurrentMap { * @param key Key. * @param val Value. * @param create Create flag. - * @return Triple where the first element is current entry associated with the key, - * the second is created entry and the third is doomed (all may be null). + * @return Existing or new GridCacheMapEntry. Will return {@code null} if entry is obsolete or absent and create + * flag is set to {@code false}. Will also return {@code null} if create flag is set to {@code true}, but entry + * couldn't be created. */ @Nullable public GridCacheMapEntry putEntryIfObsoleteOrAbsent( AffinityTopologyVersion topVer, http://git-wip-us.apache.org/repos/asf/ignite/blob/c61d1387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java index c1dbd0c..15a688b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java @@ -23,7 +23,6 @@ import java.util.Collection; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.lang.IgnitePredicate; @@ -118,104 +117,145 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM boolean done = false; - while (!done) { - GridCacheMapEntry entry = map.get(key); - created = null; - doomed = null; + boolean reserved = false; - if (entry == null) { - if (create) { - if (created0 == null) - created0 = factory.create(ctx, topVer, key, key.hashCode(), val); + try { + while (!done) { + GridCacheMapEntry entry = map.get(key); + created = null; + doomed = null; - cur = created = created0; + if (entry == null) { + if (create) { + if (created0 == null) { + if (!reserved) { + if (!reserve()) + return null; - done = map.putIfAbsent(created.key(), created) == null; - } - else - done = true; - } - else { - if (entry.obsolete()) { - doomed = entry; + reserved = true; + } - if (create) { - if (created0 == null) created0 = factory.create(ctx, topVer, key, key.hashCode(), val); + } cur = created = created0; - done = map.replace(entry.key(), doomed, created); + done = map.putIfAbsent(created.key(), created) == null; } else - done = map.remove(entry.key(), doomed); + done = true; } else { - cur = entry; + if (entry.obsolete()) { + doomed = entry; + + if (create) { + if (created0 == null) { + if (!reserved) { + if (!reserve()) + return null; + + reserved = true; + } + + created0 = factory.create(ctx, topVer, key, key.hashCode(), val); + } - done = true; + cur = created = created0; + + done = map.replace(entry.key(), doomed, created); + } + else + done = map.remove(entry.key(), doomed); + } + else { + cur = entry; + + done = true; + } } } - } - int sizeChange = 0; + int sizeChange = 0; + + if (doomed != null) { + synchronized (doomed) { + if (!doomed.deleted()) + sizeChange--; + } - if (doomed != null) { - synchronized (doomed) { - if (!doomed.deleted()) - sizeChange--; + if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED)) + ctx.events().addEvent(doomed.partition(), + doomed.key(), + ctx.localNodeId(), + (IgniteUuid)null, + null, + EVT_CACHE_ENTRY_DESTROYED, + null, + false, + null, + false, + null, + null, + null, + true); } - if (ctx.events().isRecordable(EVT_CACHE_ENTRY_DESTROYED)) - ctx.events().addEvent(doomed.partition(), - doomed.key(), - ctx.localNodeId(), - (IgniteUuid)null, - null, - EVT_CACHE_ENTRY_DESTROYED, - null, - false, - null, - false, - null, - null, - null, - true); - } + if (created != null) { + sizeChange++; + + if (ctx.events().isRecordable(EVT_CACHE_ENTRY_CREATED)) + ctx.events().addEvent(created.partition(), + created.key(), + ctx.localNodeId(), + (IgniteUuid)null, + null, + EVT_CACHE_ENTRY_CREATED, + null, + false, + null, + false, + null, + null, + null, + true); + + if (touch) + ctx.evicts().touch( + cur, + topVer); + } - if (created != null) { - sizeChange++; - - if (ctx.events().isRecordable(EVT_CACHE_ENTRY_CREATED)) - ctx.events().addEvent(created.partition(), - created.key(), - ctx.localNodeId(), - (IgniteUuid)null, - null, - EVT_CACHE_ENTRY_CREATED, - null, - false, - null, - false, - null, - null, - null, - true); - - if (touch) - ctx.evicts().touch( - cur, - topVer); - } + assert Math.abs(sizeChange) <= 1; + + if (sizeChange == -1) + decrementPublicSize(cur); + else if (sizeChange == 1) { + assert reserved; - assert Math.abs(sizeChange) <= 1; + incrementPublicSize(cur); + } + + return cur; + } + finally { + if (reserved) + release(); + } + } - if (sizeChange == -1) - decrementPublicSize(cur); - else if (sizeChange == 1) - incrementPublicSize(cur); + /** + * + */ + protected boolean reserve() { + return true; + } - return cur; + /** + * + */ + protected void release() { + // No-op. } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c61d1387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java index cfbe9bb..6230c85 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java @@ -81,12 +81,19 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap /** {@inheritDoc} */ @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key, @Nullable CacheObject val, boolean create, boolean touch) { - GridDhtLocalPartition part = localPartition(key, topVer, create); + while (true) { + GridDhtLocalPartition part = localPartition(key, topVer, create); - if (part == null) - return null; + if (part == null) + return null; + + GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, key, val, create, touch); - return part.putEntryIfObsoleteOrAbsent(topVer, key, val, create, touch); + if (res != null || !create) + return res; + + // Otherwise parttion was concurrently evicted and should be re-created on next iteration. + } } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/c61d1387/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index d3ec2af..cd69494 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.Set; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -32,13 +31,9 @@ import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; import org.apache.ignite.IgniteLogger; import org.apache.ignite.internal.IgniteInternalFuture; -import org.apache.ignite.internal.IgniteInterruptedCheckedException; import org.apache.ignite.internal.NodeStoppingException; import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; -import org.apache.ignite.internal.processors.cache.CacheObject; -import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; @@ -49,20 +44,15 @@ import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; -import org.apache.ignite.internal.util.GridCircularBuffer; import org.apache.ignite.internal.processors.query.QueryUtils; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.lang.GridIterator; import org.apache.ignite.internal.util.tostring.GridToStringExclude; -import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.T2; -import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import org.jsr166.ConcurrentLinkedDeque8; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; @@ -983,8 +973,6 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements while (true) { long state = this.state.get(); - assert getPartState(state) != EVICTED; - if (this.state.compareAndSet(state, setSize(state, getSize(state) + 1))) return; }
