ignite-4932 When possible for cache 'get' read directly from offheap without entry creation.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/01671827 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/01671827 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/01671827 Branch: refs/heads/master Commit: 01671827411ed6043e6bfb80514e3ff57fb40b18 Parents: 7ea5830 Author: sboikov <[email protected]> Authored: Fri May 12 09:06:48 2017 +0300 Committer: sboikov <[email protected]> Committed: Fri May 12 09:06:48 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 129 ++++-- .../cache/GridCacheConcurrentMap.java | 2 - .../cache/GridCacheConcurrentMapImpl.java | 5 +- .../processors/cache/GridCacheContext.java | 12 + .../processors/cache/GridCacheEventManager.java | 32 ++ .../processors/cache/GridCacheMapEntry.java | 14 +- .../cache/GridCacheMapEntryFactory.java | 6 +- .../processors/cache/GridNoStorageCacheMap.java | 8 +- .../cache/IgniteCacheOffheapManager.java | 7 + .../cache/IgniteCacheOffheapManagerImpl.java | 17 + .../distributed/GridDistributedCacheEntry.java | 8 +- .../dht/GridCachePartitionedConcurrentMap.java | 10 +- .../distributed/dht/GridDhtCacheAdapter.java | 8 +- .../distributed/dht/GridDhtCacheEntry.java | 8 +- .../distributed/dht/GridDhtLocalPartition.java | 4 +- .../dht/GridPartitionedGetFuture.java | 158 +++++--- .../dht/GridPartitionedSingleGetFuture.java | 141 ++++--- .../dht/atomic/GridDhtAtomicCache.java | 226 ++++++----- .../dht/atomic/GridDhtAtomicCacheEntry.java | 11 +- .../dht/colocated/GridDhtColocatedCache.java | 222 ++++++----- .../colocated/GridDhtColocatedCacheEntry.java | 11 +- .../colocated/GridDhtDetachedCacheEntry.java | 10 +- .../distributed/near/GridNearCacheAdapter.java | 6 +- .../distributed/near/GridNearCacheEntry.java | 8 +- .../cache/distributed/near/GridNearTxLocal.java | 7 +- .../processors/cache/local/GridLocalCache.java | 6 +- .../cache/local/GridLocalCacheEntry.java | 8 +- .../local/atomic/GridLocalAtomicCache.java | 188 +++++---- .../ignite/spi/discovery/tcp/ServerImpl.java | 2 + .../cache/IgniteCacheNoSyncForGetTest.java | 395 +++++++++++++++++++ .../IgniteCacheExpiryPolicyAbstractTest.java | 2 +- .../loadtests/hashmap/GridHashMapLoadTest.java | 4 +- .../testsuites/IgniteCacheTestSuite2.java | 3 + .../cache/IgniteGetFromComputeBenchmark.java | 167 ++++++++ 34 files changed, 1339 insertions(+), 506 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 0b1ab74..694f4b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -944,7 +944,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Entry (never {@code null}). */ public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) { - GridCacheEntryEx e = map.putEntryIfObsoleteOrAbsent(topVer, key, null, true, false); + GridCacheEntryEx e = map.putEntryIfObsoleteOrAbsent(topVer, key, true, false); assert e != null; @@ -966,7 +966,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V cur = map.putEntryIfObsoleteOrAbsent( topVer, key, - null, create, touch); } @@ -1965,58 +1964,104 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough(); + boolean readNoEntry = ctx.readNoEntry(expiry, readerArgs != null); + for (KeyCacheObject key : keys) { while (true) { - GridCacheEntryEx entry = entryEx(key); - - if (entry == null) { - if (!skipVals && ctx.config().isStatisticsEnabled()) - ctx.cache().metrics0().onRead(false); - - break; - } - try { - EntryGetResult res; + EntryGetResult res = null; boolean evt = !skipVals; boolean updateMetrics = !skipVals; - if (storeEnabled) { - res = entry.innerGetAndReserveForLoad(updateMetrics, - evt, - subjId, - taskName, - expiry, - !deserializeBinary, - readerArgs); + GridCacheEntryEx entry = null; - assert res != null; + boolean skipEntry = readNoEntry; - if (res.value() == null) { - if (misses == null) - misses = new HashMap<>(); + if (readNoEntry) { + CacheDataRow row = ctx.offheap().read(key); - misses.put(key, res); + if (row != null) { + long expireTime = row.expireTime(); + + if (expireTime != 0) { + if (expireTime > U.currentTimeMillis()) { + res = new EntryGetWithTtlResult(row.value(), + row.version(), + false, + expireTime, + 0); + } + else + skipEntry = false; + } + else + res = new EntryGetResult(row.value(), row.version(), false); + } + + if (res != null) { + if (evt) { + ctx.events().readEvent(key, + null, + row.value(), + subjId, + taskName, + !deserializeBinary); + } - res = null; + if (updateMetrics && ctx.cache().configuration().isStatisticsEnabled()) + ctx.cache().metrics0().onRead(true); } + else if (storeEnabled) + skipEntry = false; } - else { - res = entry.innerGetVersioned( - null, - null, - updateMetrics, - evt, - subjId, - null, - taskName, - expiry, - !deserializeBinary, - readerArgs); - - if (res == null) - ctx.evicts().touch(entry, topVer); + + if (!skipEntry) { + entry = entryEx(key); + + if (entry == null) { + if (!skipVals && ctx.config().isStatisticsEnabled()) + ctx.cache().metrics0().onRead(false); + + break; + } + + if (storeEnabled) { + res = entry.innerGetAndReserveForLoad(updateMetrics, + evt, + subjId, + taskName, + expiry, + !deserializeBinary, + readerArgs); + + assert res != null; + + if (res.value() == null) { + if (misses == null) + misses = new HashMap<>(); + + misses.put(key, res); + + res = null; + } + } + else { + res = entry.innerGetVersioned( + null, + null, + updateMetrics, + evt, + subjId, + null, + taskName, + expiry, + !deserializeBinary, + readerArgs); + + if (res == null) + ctx.evicts().touch(entry, topVer); + } } if (res != null) { @@ -2029,7 +2074,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V true, needVer); - if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) + if (entry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) ctx.evicts().touch(entry, topVer); if (keysSize == 1) http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java index 9378f74..0fe5c9f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMap.java @@ -38,7 +38,6 @@ public interface GridCacheConcurrentMap { /** * @param topVer Topology version. * @param key Key. - * @param val Value. * @param create Create flag. * @return Existing or new GridCacheMapEntry. Will return {@code null} if entry is obsolete or absent and create * flag is set to {@code false}. Will also return {@code null} if create flag is set to {@code true}, but entry @@ -47,7 +46,6 @@ public interface GridCacheConcurrentMap { @Nullable public GridCacheMapEntry putEntryIfObsoleteOrAbsent( AffinityTopologyVersion topVer, KeyCacheObject key, - @Nullable CacheObject val, boolean create, boolean touch); http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java index 76d961a..2c262df 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheConcurrentMapImpl.java @@ -107,7 +107,6 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM /** {@inheritDoc} */ @Nullable @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(final AffinityTopologyVersion topVer, KeyCacheObject key, - @Nullable final CacheObject val, final boolean create, final boolean touch) { GridCacheMapEntry cur = null; @@ -135,7 +134,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM reserved = true; } - created0 = factory.create(ctx, topVer, key, key.hashCode(), val); + created0 = factory.create(ctx, topVer, key); } cur = created = created0; @@ -158,7 +157,7 @@ public abstract class GridCacheConcurrentMapImpl implements GridCacheConcurrentM reserved = true; } - created0 = factory.create(ctx, topVer, key, key.hashCode(), val); + created0 = factory.create(ctx, topVer, key); } cur = created = created0; http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 8d562c5..a0489fc 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -2042,6 +2042,18 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * Checks if it is possible to directly read data memory without entry creation (this + * is optimization to avoid unnecessary blocking synchronization on cache entry). + * + * @param expiryPlc Optional expiry policy for read operation. + * @param readers {@code True} if need update near cache readers. + * @return {@code True} if it is possible to directly read offheap instead of using {@link GridCacheEntryEx#innerGet}. + */ + public boolean readNoEntry(@Nullable IgniteCacheExpiryPolicy expiryPlc, boolean readers) { + return !config().isOnheapCacheEnabled() && !readers && expiryPlc == null; + } + + /** * @return {@code True} if fast eviction is allowed. */ public boolean allowFastEviction() { http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index be5b539..687b132 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED; import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED; @@ -62,6 +63,37 @@ public class GridCacheEventManager extends GridCacheManagerAdapter { } /** + * @param key Key for event. + * @param tx Possible surrounding transaction. + * @param val Read value. + * @param subjId Subject ID. + * @param taskName Task name. + * @param keepBinary Keep binary flag. + */ + public void readEvent(KeyCacheObject key, + @Nullable IgniteInternalTx tx, + @Nullable CacheObject val, + @Nullable UUID subjId, + @Nullable String taskName, + boolean keepBinary) { + if (isRecordable(EVT_CACHE_OBJECT_READ)) { + addEvent(cctx.affinity().partition(key), + key, + tx, + null, + EVT_CACHE_OBJECT_READ, + val, + val != null, + val, + val != null, + subjId, + null, + taskName, + keepBinary); + } + } + + /** * @param part Partition. * @param key Key for the event. * @param tx Possible surrounding transaction. http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java index 21c58fa..edf90d0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntry.java @@ -169,14 +169,10 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme /** * @param cctx Cache context. * @param key Cache key. - * @param hash Key hash value. - * @param val Entry value. */ protected GridCacheMapEntry( GridCacheContext<?, ?> cctx, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ) { if (log == null) log = U.logger(cctx.kernalContext(), logRef, GridCacheMapEntry.class); @@ -186,15 +182,9 @@ public abstract class GridCacheMapEntry extends GridMetadataAwareAdapter impleme assert key != null; this.key = key; - this.hash = hash; + this.hash = key.hashCode(); this.cctx = cctx; - val = cctx.kernalContext().cacheObjects().prepareForCache(val, cctx); - - synchronized (this) { - value(val); - } - ver = cctx.versions().next(); startVer = ver.order(); http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java index 4ee9385..d3e3921 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMapEntryFactory.java @@ -27,15 +27,11 @@ public interface GridCacheMapEntryFactory { * @param ctx Cache registry. * @param topVer Topology version. * @param key Cache key. - * @param hash Key hash value. - * @param val Entry value. * @return New cache entry. */ public GridCacheMapEntry create( GridCacheContext ctx, AffinityTopologyVersion topVer, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ); } http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java index 00827ee..14a8482 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridNoStorageCacheMap.java @@ -45,10 +45,12 @@ public class GridNoStorageCacheMap implements GridCacheConcurrentMap { } /** {@inheritDoc} */ - @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key, - @Nullable CacheObject val, boolean create, boolean touch) { + @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, + KeyCacheObject key, + boolean create, + boolean touch) { if (create) - return new GridDhtCacheEntry(ctx, topVer, key, key.hashCode(), val); + return new GridDhtCacheEntry(ctx, topVer, key); else return null; } http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 9eb5368..b476aeb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -71,6 +71,13 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { @Nullable public CacheDataRow read(GridCacheMapEntry entry) throws IgniteCheckedException; /** + * @param key Key. + * @return Cached row, if available, null otherwise. + * @throws IgniteCheckedException If failed. + */ + @Nullable public CacheDataRow read(KeyCacheObject key) throws IgniteCheckedException; + + /** * @param p Partition. * @return Data store. * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 650f65e..099840a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -362,6 +362,23 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ + @Nullable @Override public CacheDataRow read(KeyCacheObject key) throws IgniteCheckedException { + CacheDataRow row; + + if (cctx.isLocal()) + row = locCacheDataStore.find(key); + else { + GridDhtLocalPartition part = cctx.topology().localPartition(cctx.affinity().partition(key), null, false); + + row = part != null ? dataStore(part).find(key) : null; + } + + assert row == null || row.value() != null : row; + + return row; + } + + /** {@inheritDoc} */ @Override public boolean containsKey(GridCacheMapEntry entry) { try { return read(entry) != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java index f518934..e7675b7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheEntry.java @@ -49,16 +49,12 @@ public class GridDistributedCacheEntry extends GridCacheMapEntry { /** * @param ctx Cache context. * @param key Cache key. - * @param hash Key hash value. - * @param val Entry value. */ public GridDistributedCacheEntry( GridCacheContext ctx, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ) { - super(ctx, key, hash, val); + super(ctx, key); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java index 357bf89..f021b65 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridCachePartitionedConcurrentMap.java @@ -79,20 +79,22 @@ public class GridCachePartitionedConcurrentMap implements GridCacheConcurrentMap } /** {@inheritDoc} */ - @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, KeyCacheObject key, - @Nullable CacheObject val, boolean create, boolean touch) { + @Override public GridCacheMapEntry putEntryIfObsoleteOrAbsent(AffinityTopologyVersion topVer, + KeyCacheObject key, + boolean create, + boolean touch) { while (true) { GridDhtLocalPartition part = localPartition(key, topVer, create); if (part == null) return null; - GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, key, val, create, touch); + GridCacheMapEntry res = part.putEntryIfObsoleteOrAbsent(topVer, key, create, touch); if (res != null || !create) return res; - // Otherwise parttion was concurrently evicted and should be re-created on next iteration. + // Otherwise partition was concurrently evicted and should be re-created on next iteration. } } http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 7e6ae81..121c734 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -246,11 +246,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Override public GridCacheMapEntry create( GridCacheContext ctx, AffinityTopologyVersion topVer, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ) { - return new GridDhtCacheEntry(ctx, topVer, key, hash, val); + return new GridDhtCacheEntry(ctx, topVer, key); } }; } @@ -428,7 +426,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @return Cache entry. */ protected GridDistributedCacheEntry createEntry(KeyCacheObject key) { - return new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0); + return new GridDhtDetachedCacheEntry(ctx, key); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index 4c22090..be7805f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -76,17 +76,13 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { * @param ctx Cache context. * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). * @param key Cache key. - * @param hash Key hash value. - * @param val Entry value. */ public GridDhtCacheEntry( GridCacheContext ctx, AffinityTopologyVersion topVer, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ) { - super(ctx, key, hash, val); + super(ctx, key); // Record this entry with partition. int p = cctx.affinity().partition(key); http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java index 5425954..4208a98 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLocalPartition.java @@ -900,7 +900,9 @@ public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements CacheDataRow row = it0.next(); GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(cctx.affinity().affinityTopologyVersion(), - row.key(), null, true, false); + row.key(), + true, + false); if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) { if (rec) { http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 4dc4eb4..362432c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -445,75 +446,109 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda GridDhtCacheAdapter<K, V> cache = cache(); + boolean readNoEntry = cctx.readNoEntry(expiryPlc, false); + boolean evt = !skipVals; + while (true) { try { - GridCacheEntryEx entry = cache.entryEx(key); - - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); - - EntryGetResult getRes = null; - CacheObject v = null; - GridCacheVersion ver = null; - - if (needVer) { - getRes = entry.innerGetVersioned( - null, - null, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary, - null); - - if (getRes != null) { - v = getRes.value(); - ver = getRes.version(); + boolean skipEntry = readNoEntry; + + EntryGetResult getRes = null; + CacheObject v = null; + GridCacheVersion ver = null; + + if (readNoEntry) { + CacheDataRow row = cctx.offheap().read(key); + + if (row != null) { + long expireTime = row.expireTime(); + + if (expireTime == 0 || expireTime > U.currentTimeMillis()) { + v = row.value(); + + if (needVer) + ver = row.version(); + + if (evt) { + cctx.events().readEvent(key, + null, + row.value(), + subjId, + taskName, + !deserializeBinary); + } } + else + skipEntry = false; } - else { - v = entry.innerGet( - null, - null, - /*read-through*/false, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary); - } + } - cache.context().evicts().touch(entry, topVer); + if (!skipEntry) { + GridCacheEntryEx entry = cache.entryEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + if (needVer) { + getRes = entry.innerGetVersioned( + null, + null, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary, + null); + + if (getRes != null) { + v = getRes.value(); + ver = getRes.version(); + } + } + else { + v = entry.innerGet( + null, + null, + /*read-through*/false, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary); + } - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - if (isNew && entry.markObsoleteIfEmpty(ver)) - cache.removeEntry(entry); - } - else { - cctx.addResult(locVals, - key, - v, - skipVals, - keepCacheObjects, - deserializeBinary, - true, - getRes, - ver, - 0, - 0, - needVer); - - return true; + cache.context().evicts().touch(entry, topVer); + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + if (isNew && entry.markObsoleteIfEmpty(ver)) + cache.removeEntry(entry); + } } } + if (v != null) { + cctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObjects, + deserializeBinary, + true, + getRes, + ver, + 0, + 0, + needVer); + + return true; + } + boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion()); // Entry not found, do not continue search if topology did not change and there is no store. @@ -604,9 +639,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda */ private class MiniFuture extends GridFutureAdapter<Map<K, V>> { /** */ - private static final long serialVersionUID = 0L; - - /** */ private final IgniteUuid futId = IgniteUuid.randomUuid(); /** Node ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index dbf1fe1..63ed9a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; @@ -288,7 +289,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec expiryPlc != null ? expiryPlc.forCreate() : -1L, expiryPlc != null ? expiryPlc.forAccess() : -1L, skipVals, - /**add reader*/false, + /*add reader*/false, needVer, cctx.deploymentEnabled(), recovery); @@ -347,67 +348,101 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec GridDhtCacheAdapter colocated = cctx.dht(); + boolean readNoEntry = cctx.readNoEntry(expiryPlc, false); + boolean evt = !skipVals; + while (true) { try { - GridCacheEntryEx entry = colocated.entryEx(key); - - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); - - CacheObject v = null; - GridCacheVersion ver = null; - - if (needVer) { - EntryGetResult res = entry.innerGetVersioned( - null, - null, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - true, - null); - - if (res != null) { - v = res.value(); - ver = res.version(); + CacheObject v = null; + GridCacheVersion ver = null; + + boolean skipEntry = readNoEntry; + + if (readNoEntry) { + CacheDataRow row = cctx.offheap().read(key); + + if (row != null) { + long expireTime = row.expireTime(); + + if (expireTime == 0 || expireTime > U.currentTimeMillis()) { + v = row.value(); + + if (needVer) + ver = row.version(); + + if (evt) { + cctx.events().readEvent(key, + null, + row.value(), + subjId, + taskName, + !deserializeBinary); + } } + else + skipEntry = false; } - else { - v = entry.innerGet( - null, - null, - /*read-through*/false, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - true); - } + } - colocated.context().evicts().touch(entry, topVer); + if (!skipEntry) { + GridCacheEntryEx entry = colocated.entryEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + if (needVer) { + EntryGetResult res = entry.innerGetVersioned( + null, + null, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + true, + null); + + if (res != null) { + v = res.value(); + ver = res.version(); + } + } + else { + v = entry.innerGet( + null, + null, + /*read-through*/false, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + true); + } + + colocated.context().evicts().touch(entry, topVer); - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - if (isNew && entry.markObsoleteIfEmpty(ver)) - colocated.removeEntry(entry); + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + if (isNew && entry.markObsoleteIfEmpty(ver)) + colocated.removeEntry(entry); + } } - else { - if (!skipVals && cctx.config().isStatisticsEnabled()) - cctx.cache().metrics0().onRead(true); + } - if (!skipVals) - setResult(v, ver); - else - setSkipValueResult(true, ver); + if (v != null) { + if (!skipVals && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(true); - return true; - } + if (!skipVals) + setResult(v, ver); + else + setSkipValueResult(true, ver); + + return true; } boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index e477592..47572fd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; @@ -196,11 +197,9 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Override public GridCacheMapEntry create( GridCacheContext ctx, AffinityTopologyVersion topVer, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ) { - return new GridDhtAtomicCacheEntry(ctx, topVer, key, hash, val); + return new GridDhtAtomicCacheEntry(ctx, topVer, key); } }; } @@ -1473,105 +1472,156 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); + final boolean evt = !skipVals; + // Optimisation: try to resolve value locally and escape 'get future' creation. if (!forcePrimary && ctx.affinityNode()) { - Map<K, V> locVals = U.newHashMap(keys.size()); - - boolean success = true; - - // Optimistically expect that all keys are available locally (avoid creation of get future). - for (KeyCacheObject key : keys) { - GridCacheEntryEx entry = null; - - while (true) { - try { - entry = entryEx(key); - - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); - - EntryGetResult getRes = null; - CacheObject v = null; - GridCacheVersion ver = null; - - if (needVer) { - getRes = entry.innerGetVersioned( - null, - null, - /*update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiry, + try { + Map<K, V> locVals = U.newHashMap(keys.size()); + + boolean success = true; + boolean readNoEntry = ctx.readNoEntry(expiry, false); + + // Optimistically expect that all keys are available locally (avoid creation of get future). + for (KeyCacheObject key : keys) { + if (readNoEntry) { + CacheDataRow row = ctx.offheap().read(key); + + if (row != null) { + long expireTime = row.expireTime(); + + if (expireTime == 0 || expireTime > U.currentTimeMillis()) { + ctx.addResult(locVals, + key, + row.value(), + skipVals, + false, + deserializeBinary, true, - null); - - if (getRes != null) { - v = getRes.value(); - ver = getRes.version(); - } - } - else { - v = entry.innerGet( null, - null, - /*read-through*/false, - /*update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiry, - !deserializeBinary); - } - - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - GridCacheVersion obsoleteVer = context().versions().next(); - - if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) - removeEntry(entry); - - success = false; + row.version(), + 0, + 0, + needVer); + + if (evt) { + ctx.events().readEvent(key, + null, + row.value(), + subjId, + taskName, + !deserializeBinary); + } } else - ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true, - getRes, ver, 0, 0, needVer); + success = false; } else success = false; - - break; // While. - } - catch (GridCacheEntryRemovedException ignored) { - // No-op, retry. } - catch (GridDhtInvalidPartitionException ignored) { - success = false; + else { + GridCacheEntryEx entry = null; + + while (true) { + try { + entry = entryEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + EntryGetResult getRes = null; + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + getRes = entry.innerGetVersioned( + null, + null, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiry, + true, + null); + + if (getRes != null) { + v = getRes.value(); + ver = getRes.version(); + } + } + else { + v = entry.innerGet( + null, + null, + /*read-through*/false, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiry, + !deserializeBinary); + } + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + if (isNew && entry.markObsoleteIfEmpty(context().versions().next())) + removeEntry(entry); + + success = false; + } + else { + ctx.addResult(locVals, + key, + v, + skipVals, + false, + deserializeBinary, + true, + getRes, + ver, + 0, + 0, + needVer); + } + } + else + success = false; - break; // While. - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - finally { - if (entry != null) - ctx.evicts().touch(entry, topVer); + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, retry. + } + catch (GridDhtInvalidPartitionException ignored) { + success = false; + + break; // While. + } + finally { + if (entry != null) + ctx.evicts().touch(entry, topVer); + } + } } - } - if (!success) - break; - else if (!skipVals && ctx.config().isStatisticsEnabled()) - metrics0().onRead(true); - } + if (!success) + break; + else if (!skipVals && ctx.config().isStatisticsEnabled()) + metrics0().onRead(true); + } - if (success) { - sendTtlUpdateRequest(expiry); + if (success) { + sendTtlUpdateRequest(expiry); - return new GridFinishedFuture<>(locVals); + return new GridFinishedFuture<>(locVals); + } + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java index 3f014d5..b0c9a64 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCacheEntry.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; @@ -33,17 +32,13 @@ public class GridDhtAtomicCacheEntry extends GridDhtCacheEntry { * @param ctx Cache context. * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). * @param key Cache key. - * @param hash Key hash value. - * @param val Entry value. */ - public GridDhtAtomicCacheEntry( + GridDhtAtomicCacheEntry( GridCacheContext ctx, AffinityTopologyVersion topVer, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ) { - super(ctx, topVer, key, hash, val); + super(ctx, topVer, key); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 2292cb2..12a3912 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest; @@ -120,11 +121,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Override public GridCacheMapEntry create( GridCacheContext ctx, AffinityTopologyVersion topVer, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ) { - return new GridDhtColocatedCacheEntry(ctx, topVer, key, hash, val); + return new GridDhtColocatedCacheEntry(ctx, topVer, key); } }; } @@ -458,118 +457,161 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte expiryPlc = expiryPolicy(null); // Optimisation: try to resolve value locally and escape 'get future' creation. - if (!forcePrimary) { - Map<K, V> locVals = null; + if (!forcePrimary && ctx.affinityNode()) { + try { + Map<K, V> locVals = null; - boolean success = true; + boolean success = true; + boolean readNoEntry = ctx.readNoEntry(expiryPlc, false); + boolean evt = !skipVals; - // Optimistically expect that all keys are available locally (avoid creation of get future). - for (KeyCacheObject key : keys) { - GridCacheEntryEx entry = null; - - while (true) { - try { - entry = entryEx(key); - - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); - - EntryGetResult getRes = null; - CacheObject v = null; - GridCacheVersion ver = null; - - if (needVer) { - getRes = entry.innerGetVersioned( - null, - null, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary, - null); - - if (getRes != null) { - v = getRes.value(); - ver = getRes.version(); - } - } - else { - v = entry.innerGet( - null, - null, - /*read-through*/false, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary); - } - - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - GridCacheVersion obsoleteVer = context().versions().next(); + for (KeyCacheObject key : keys) { + if (readNoEntry) { + CacheDataRow row = ctx.offheap().read(key); - if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) - removeEntry(entry); + if (row != null) { + long expireTime = row.expireTime(); - success = false; - } - else { + if (expireTime == 0 || expireTime > U.currentTimeMillis()) { if (locVals == null) locVals = U.newHashMap(keys.size()); ctx.addResult(locVals, key, - v, + row.value(), skipVals, keepCacheObj, deserializeBinary, true, - getRes, - ver, + null, + row.version(), 0, 0, needVer); + + if (evt) { + ctx.events().readEvent(key, + null, + row.value(), + subjId, + taskName, + !deserializeBinary); + } } + else + success = false; } else success = false; - - break; // While. - } - catch (GridCacheEntryRemovedException ignored) { - // No-op, retry. } - catch (GridDhtInvalidPartitionException ignored) { - success = false; + else { + GridCacheEntryEx entry = null; + + while (true) { + try { + entry = entryEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + EntryGetResult getRes = null; + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + getRes = entry.innerGetVersioned( + null, + null, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary, + null); + + if (getRes != null) { + v = getRes.value(); + ver = getRes.version(); + } + } + else { + v = entry.innerGet( + null, + null, + /*read-through*/false, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary); + } + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + GridCacheVersion obsoleteVer = context().versions().next(); + + if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) + removeEntry(entry); + + success = false; + } + else { + if (locVals == null) + locVals = U.newHashMap(keys.size()); + + ctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObj, + deserializeBinary, + true, + getRes, + ver, + 0, + 0, + needVer); + } + } + else + success = false; - break; // While. - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - finally { - if (entry != null) - context().evicts().touch(entry, topVer); + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, retry. + } + catch (GridDhtInvalidPartitionException ignored) { + success = false; + + break; // While. + } + finally { + if (entry != null) + context().evicts().touch(entry, topVer); + } + } } - } - if (!success) - break; - else if (!skipVals && ctx.config().isStatisticsEnabled()) - ctx.cache().metrics0().onRead(true); - } + if (!success) + break; + else if (!skipVals && ctx.config().isStatisticsEnabled()) + ctx.cache().metrics0().onRead(true); + } - if (success) { - sendTtlUpdateRequest(expiryPlc); + if (success) { + sendTtlUpdateRequest(expiryPlc); - return new GridFinishedFuture<>(locVals); + return new GridFinishedFuture<>(locVals); + } + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java index cc71e11..f7cc5a7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCacheEntry.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; -import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; @@ -32,17 +31,13 @@ public class GridDhtColocatedCacheEntry extends GridDhtCacheEntry { * @param ctx Cache context. * @param topVer Topology version at the time of creation (if negative, then latest topology is assumed). * @param key Cache key. - * @param hash Key hash value. - * @param val Entry value. */ - public GridDhtColocatedCacheEntry( + GridDhtColocatedCacheEntry( GridCacheContext ctx, AffinityTopologyVersion topVer, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ) { - super(ctx, topVer, key, hash, val); + super(ctx, topVer, key); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java index ac81b63..404265d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtDetachedCacheEntry.java @@ -20,7 +20,6 @@ package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; -import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.database.CacheDataRow; @@ -36,14 +35,9 @@ public class GridDhtDetachedCacheEntry extends GridDistributedCacheEntry { /** * @param ctx Cache context. * @param key Cache key. - * @param hash Key hash value. - * @param val Entry value. - * @param next Next entry in the linked list. - * @param hdrId Header ID. */ - public GridDhtDetachedCacheEntry(GridCacheContext ctx, KeyCacheObject key, int hash, CacheObject val, - GridCacheMapEntry next, int hdrId) { - super(ctx, key, hash, val); + public GridDhtDetachedCacheEntry(GridCacheContext ctx, KeyCacheObject key) { + super(ctx, key); } /** http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 59d986a..0b25f58 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -95,11 +95,9 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda @Override public GridCacheMapEntry create( GridCacheContext ctx, AffinityTopologyVersion topVer, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ) { - return new GridNearCacheEntry(ctx, key, hash, val); + return new GridNearCacheEntry(ctx, key); } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java index fa098df..b17d0b5 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java @@ -66,16 +66,12 @@ public class GridNearCacheEntry extends GridDistributedCacheEntry { /** * @param ctx Cache context. * @param key Cache key. - * @param hash Key hash value. - * @param val Entry value. */ public GridNearCacheEntry( GridCacheContext ctx, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ) { - super(ctx, key, hash, val); + super(ctx, key); part = ctx.affinity().partition(key); } http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java index f795ddc..9ad084e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java @@ -3878,17 +3878,16 @@ public class GridNearTxLocal extends GridDhtTxLocalAdapter implements AutoClosea /** * @param cctx Cache context. * @param key Key. - * @param val Value. * @param filter Filter. * @return {@code True} if filter passed. */ private boolean isAll(GridCacheContext cctx, KeyCacheObject key, - CacheObject val, + final CacheObject val0, CacheEntryPredicate[] filter) { - GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key, 0, val, null, 0) { + GridCacheEntryEx e = new GridDhtDetachedCacheEntry(cctx, key) { @Nullable @Override public CacheObject peekVisibleValue() { - return rawGet(); + return val0; } }; http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java index 94f618a..5e3dc3f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCache.java @@ -86,11 +86,9 @@ public class GridLocalCache<K, V> extends GridCacheAdapter<K, V> { @Override public GridCacheMapEntry create( GridCacheContext ctx, AffinityTopologyVersion topVer, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ) { - return new GridLocalCacheEntry(ctx, key, hash, val); + return new GridLocalCacheEntry(ctx, key); } }; } http://git-wip-us.apache.org/repos/asf/ignite/blob/01671827/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java index 3e93917..421b32a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/GridLocalCacheEntry.java @@ -38,16 +38,12 @@ public class GridLocalCacheEntry extends GridCacheMapEntry { /** * @param ctx Cache registry. * @param key Cache key. - * @param hash Key hash value. - * @param val Entry value. */ GridLocalCacheEntry( GridCacheContext ctx, - KeyCacheObject key, - int hash, - CacheObject val + KeyCacheObject key ) { - super(ctx, key, hash, val); + super(ctx, key); } /** {@inheritDoc} */
