ignite-5009
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/52aa0ab4 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/52aa0ab4 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/52aa0ab4 Branch: refs/heads/ignite-5009 Commit: 52aa0ab402b66931ee55737acca21ce7eb721041 Parents: 1214d7e Author: sboikov <[email protected]> Authored: Mon Apr 24 15:23:24 2017 +0300 Committer: sboikov <[email protected]> Committed: Mon Apr 24 15:56:05 2017 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 134 +++++-- .../processors/cache/GridCacheContext.java | 13 + .../processors/cache/GridCacheEventManager.java | 32 ++ .../cache/IgniteCacheExpiryPolicy.java | 5 + .../cache/IgniteCacheOffheapManager.java | 7 + .../cache/IgniteCacheOffheapManagerImpl.java | 10 + .../dht/GridPartitionedGetFuture.java | 158 +++++--- .../dht/GridPartitionedSingleGetFuture.java | 141 ++++--- .../dht/atomic/GridDhtAtomicCache.java | 220 +++++++---- .../dht/colocated/GridDhtColocatedCache.java | 218 ++++++---- .../local/atomic/GridLocalAtomicCache.java | 188 +++++---- .../cache/IgniteCacheNoSyncForGetTest.java | 394 +++++++++++++++++++ .../IgniteCacheExpiryPolicyAbstractTest.java | 5 +- .../testsuites/IgniteCacheTestSuite2.java | 3 + .../cache/IgniteGetFromComputeBenchmark.java | 167 ++++++++ 15 files changed, 1295 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index c9f7430..d630a2b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -1967,58 +1967,104 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough(); + boolean readNoEntry = ctx.readNoEntry(expiry, false); + for (KeyCacheObject key : keys) { while (true) { - GridCacheEntryEx entry = entryEx(key); - - if (entry == null) { - if (!skipVals && ctx.config().isStatisticsEnabled()) - ctx.cache().metrics0().onRead(false); - - break; - } - try { - EntryGetResult res; + EntryGetResult res = null; boolean evt = !skipVals; boolean updateMetrics = !skipVals; - if (storeEnabled) { - res = entry.innerGetAndReserveForLoad(updateMetrics, - evt, - subjId, - taskName, - expiry, - !deserializeBinary, - readerArgs); + GridCacheEntryEx entry = null; - assert res != null; + boolean skipEntry = readNoEntry; - if (res.value() == null) { - if (misses == null) - misses = new HashMap<>(); + if (readNoEntry) { + CacheDataRow row = ctx.offheap().read(key); - misses.put(key, res); + if (row != null) { + long expireTime = row.expireTime(); - res = null; + if (expireTime != 0) { + if (expireTime > U.currentTimeMillis()) { + res = new EntryGetWithTtlResult(row.value(), + row.version(), + false, + expireTime, + 0); + } + else + skipEntry = false; + } + else + res = new EntryGetResult(row.value(), row.version(), false); } + + if (res != null) { + if (evt) { + ctx.events().readEvent(key, + null, + row.value(), + subjId, + taskName, + !deserializeBinary); + } + + if (updateMetrics && ctx.cache().configuration().isStatisticsEnabled()) + ctx.cache().metrics0().onRead(true); + } + else if (storeEnabled) + skipEntry = false; } - else { - res = entry.innerGetVersioned( - null, - null, - updateMetrics, - evt, - subjId, - null, - taskName, - expiry, - !deserializeBinary, - readerArgs); - - if (res == null) - ctx.evicts().touch(entry, topVer); + + if (!skipEntry) { + entry = entryEx(key); + + if (entry == null) { + if (!skipVals && ctx.config().isStatisticsEnabled()) + ctx.cache().metrics0().onRead(false); + + break; + } + + if (storeEnabled) { + res = entry.innerGetAndReserveForLoad(updateMetrics, + evt, + subjId, + taskName, + expiry, + !deserializeBinary, + readerArgs); + + assert res != null; + + if (res.value() == null) { + if (misses == null) + misses = new HashMap<>(); + + misses.put(key, res); + + res = null; + } + } + else { + res = entry.innerGetVersioned( + null, + null, + updateMetrics, + evt, + subjId, + null, + taskName, + expiry, + !deserializeBinary, + readerArgs); + + if (res == null) + ctx.evicts().touch(entry, topVer); + } } if (res != null) { @@ -2031,7 +2077,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V true, needVer); - if (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED)) + if (entry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) ctx.evicts().touch(entry, topVer); if (keysSize == 1) @@ -5584,6 +5630,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return CU.toTtl(expiryPlc.getExpiryForAccess()); } + @Override public boolean hasAccessTtl() { + return CU.toTtl(expiryPlc.getExpiryForAccess()) != CU.TTL_NOT_CHANGED; + } + @Override public long forCreate() { return CU.toTtl(expiryPlc.getExpiryForCreation()); } @@ -5612,6 +5662,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V return accessTtl; } + @Override public boolean hasAccessTtl() { + return accessTtl != CU.TTL_NOT_CHANGED; + } + /** {@inheritDoc} */ @Override public long forUpdate() { return CU.TTL_NOT_CHANGED; http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java index 92c144c..921cdb6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java @@ -2035,6 +2035,19 @@ public class GridCacheContext<K, V> implements Externalizable { } /** + * Checks if it is possible to directly read data memory without entry creation (this + * is optimization to avoid unnecessary blocking synchronization on cache entry). + * + * @param expiryPlc Expiry policy for read operation. + * @param readers {@code True} if need update entry readers. + * @return {@code True} if it is possible directly read offheap instead of using {@link GridCacheEntryEx#innerGet}. + */ + public boolean readNoEntry(IgniteCacheExpiryPolicy expiryPlc, boolean readers) { + return (expiryPlc == null || !expiryPlc.hasAccessTtl()) && + !readers; + } + + /** * @return {@code True} if fast eviction is allowed. */ public boolean allowFastEviction() { http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java index be5b539..687b132 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java @@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED; import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED; import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED; @@ -62,6 +63,37 @@ public class GridCacheEventManager extends GridCacheManagerAdapter { } /** + * @param key Key for event. + * @param tx Possible surrounding transaction. + * @param val Read value. + * @param subjId Subject ID. + * @param taskName Task name. + * @param keepBinary Keep binary flag. + */ + public void readEvent(KeyCacheObject key, + @Nullable IgniteInternalTx tx, + @Nullable CacheObject val, + @Nullable UUID subjId, + @Nullable String taskName, + boolean keepBinary) { + if (isRecordable(EVT_CACHE_OBJECT_READ)) { + addEvent(cctx.affinity().partition(key), + key, + tx, + null, + EVT_CACHE_OBJECT_READ, + val, + val != null, + val, + val != null, + subjId, + null, + taskName, + keepBinary); + } + } + + /** * @param part Partition. * @param key Key for the event. * @param tx Possible surrounding transaction. http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java index f82c5f0..96f1c6f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java @@ -46,6 +46,11 @@ public interface IgniteCacheExpiryPolicy { public long forAccess(); /** + * @return {@code True} if expiry policy change ttl on entry read. + */ + public boolean hasAccessTtl(); + + /** * Callback for ttl update on entry access. * * @param key Entry key. http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java index 4a98f6a..6a43e4c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java @@ -71,6 +71,13 @@ public interface IgniteCacheOffheapManager extends GridCacheManager { @Nullable public CacheDataRow read(GridCacheMapEntry entry) throws IgniteCheckedException; /** + * @param key Key. + * @return Cached row, if available, null otherwise. + * @throws IgniteCheckedException If failed. + */ + @Nullable public CacheDataRow read(KeyCacheObject key) throws IgniteCheckedException; + + /** * @param p Partition. * @return Data store. * @throws IgniteCheckedException If failed. http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java index 73edbe1..6ca7f91 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java @@ -362,6 +362,16 @@ public class IgniteCacheOffheapManagerImpl extends GridCacheManagerAdapter imple } /** {@inheritDoc} */ + @Nullable @Override public CacheDataRow read(KeyCacheObject key) throws IgniteCheckedException { + if (cctx.isLocal()) + return locCacheDataStore.find(key); + + GridDhtLocalPartition part = cctx.topology().localPartition(cctx.affinity().partition(key), null, false); + + return part != null ? dataStore(part).find(key) : null; + } + + /** {@inheritDoc} */ @Override public boolean containsKey(GridCacheMapEntry entry) { try { return read(entry) != null; http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 4dc4eb4..362432c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -445,75 +446,109 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda GridDhtCacheAdapter<K, V> cache = cache(); + boolean readNoEntry = cctx.readNoEntry(expiryPlc, false); + boolean evt = !skipVals; + while (true) { try { - GridCacheEntryEx entry = cache.entryEx(key); - - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); - - EntryGetResult getRes = null; - CacheObject v = null; - GridCacheVersion ver = null; - - if (needVer) { - getRes = entry.innerGetVersioned( - null, - null, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary, - null); - - if (getRes != null) { - v = getRes.value(); - ver = getRes.version(); + boolean skipEntry = readNoEntry; + + EntryGetResult getRes = null; + CacheObject v = null; + GridCacheVersion ver = null; + + if (readNoEntry) { + CacheDataRow row = cctx.offheap().read(key); + + if (row != null) { + long expireTime = row.expireTime(); + + if (expireTime == 0 || expireTime > U.currentTimeMillis()) { + v = row.value(); + + if (needVer) + ver = row.version(); + + if (evt) { + cctx.events().readEvent(key, + null, + row.value(), + subjId, + taskName, + !deserializeBinary); + } } + else + skipEntry = false; } - else { - v = entry.innerGet( - null, - null, - /*read-through*/false, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary); - } + } - cache.context().evicts().touch(entry, topVer); + if (!skipEntry) { + GridCacheEntryEx entry = cache.entryEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + if (needVer) { + getRes = entry.innerGetVersioned( + null, + null, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary, + null); + + if (getRes != null) { + v = getRes.value(); + ver = getRes.version(); + } + } + else { + v = entry.innerGet( + null, + null, + /*read-through*/false, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary); + } - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - if (isNew && entry.markObsoleteIfEmpty(ver)) - cache.removeEntry(entry); - } - else { - cctx.addResult(locVals, - key, - v, - skipVals, - keepCacheObjects, - deserializeBinary, - true, - getRes, - ver, - 0, - 0, - needVer); - - return true; + cache.context().evicts().touch(entry, topVer); + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + if (isNew && entry.markObsoleteIfEmpty(ver)) + cache.removeEntry(entry); + } } } + if (v != null) { + cctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObjects, + deserializeBinary, + true, + getRes, + ver, + 0, + 0, + needVer); + + return true; + } + boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion()); // Entry not found, do not continue search if topology did not change and there is no store. @@ -604,9 +639,6 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda */ private class MiniFuture extends GridFutureAdapter<Map<K, V>> { /** */ - private static final long serialVersionUID = 0L; - - /** */ private final IgniteUuid futId = IgniteUuid.randomUuid(); /** Node ID. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index dbf1fe1..63ed9a8 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -41,6 +41,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; @@ -288,7 +289,7 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec expiryPlc != null ? expiryPlc.forCreate() : -1L, expiryPlc != null ? expiryPlc.forAccess() : -1L, skipVals, - /**add reader*/false, + /*add reader*/false, needVer, cctx.deploymentEnabled(), recovery); @@ -347,67 +348,101 @@ public class GridPartitionedSingleGetFuture extends GridCacheFutureAdapter<Objec GridDhtCacheAdapter colocated = cctx.dht(); + boolean readNoEntry = cctx.readNoEntry(expiryPlc, false); + boolean evt = !skipVals; + while (true) { try { - GridCacheEntryEx entry = colocated.entryEx(key); - - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); - - CacheObject v = null; - GridCacheVersion ver = null; - - if (needVer) { - EntryGetResult res = entry.innerGetVersioned( - null, - null, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - true, - null); - - if (res != null) { - v = res.value(); - ver = res.version(); + CacheObject v = null; + GridCacheVersion ver = null; + + boolean skipEntry = readNoEntry; + + if (readNoEntry) { + CacheDataRow row = cctx.offheap().read(key); + + if (row != null) { + long expireTime = row.expireTime(); + + if (expireTime == 0 || expireTime > U.currentTimeMillis()) { + v = row.value(); + + if (needVer) + ver = row.version(); + + if (evt) { + cctx.events().readEvent(key, + null, + row.value(), + subjId, + taskName, + !deserializeBinary); + } } + else + skipEntry = false; } - else { - v = entry.innerGet( - null, - null, - /*read-through*/false, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - true); - } + } - colocated.context().evicts().touch(entry, topVer); + if (!skipEntry) { + GridCacheEntryEx entry = colocated.entryEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + if (needVer) { + EntryGetResult res = entry.innerGetVersioned( + null, + null, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + true, + null); + + if (res != null) { + v = res.value(); + ver = res.version(); + } + } + else { + v = entry.innerGet( + null, + null, + /*read-through*/false, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiryPlc, + true); + } + + colocated.context().evicts().touch(entry, topVer); - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - if (isNew && entry.markObsoleteIfEmpty(ver)) - colocated.removeEntry(entry); + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + if (isNew && entry.markObsoleteIfEmpty(ver)) + colocated.removeEntry(entry); + } } - else { - if (!skipVals && cctx.config().isStatisticsEnabled()) - cctx.cache().metrics0().onRead(true); + } - if (!skipVals) - setResult(v, ver); - else - setSkipValueResult(true, ver); + if (v != null) { + if (!skipVals && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(true); - return true; - } + if (!skipVals) + setResult(v, ver); + else + setSkipValueResult(true, ver); + + return true; } boolean topStable = cctx.isReplicated() || topVer.equals(cctx.topology().topologyVersion()); http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index e477592..1d32edd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -61,6 +61,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; @@ -1473,105 +1474,156 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); + final boolean evt = !skipVals; + // Optimisation: try to resolve value locally and escape 'get future' creation. if (!forcePrimary && ctx.affinityNode()) { - Map<K, V> locVals = U.newHashMap(keys.size()); - - boolean success = true; - - // Optimistically expect that all keys are available locally (avoid creation of get future). - for (KeyCacheObject key : keys) { - GridCacheEntryEx entry = null; - - while (true) { - try { - entry = entryEx(key); - - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); - - EntryGetResult getRes = null; - CacheObject v = null; - GridCacheVersion ver = null; - - if (needVer) { - getRes = entry.innerGetVersioned( - null, - null, - /*update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiry, + try { + Map<K, V> locVals = U.newHashMap(keys.size()); + + boolean success = true; + boolean readNoEntry = ctx.readNoEntry(expiry, false); + + // Optimistically expect that all keys are available locally (avoid creation of get future). + for (KeyCacheObject key : keys) { + if (readNoEntry) { + CacheDataRow row = ctx.offheap().read(key); + + if (row != null) { + long expireTime = row.expireTime(); + + if (expireTime == 0 || expireTime > U.currentTimeMillis()) { + ctx.addResult(locVals, + key, + row.value(), + skipVals, + false, + deserializeBinary, true, - null); - - if (getRes != null) { - v = getRes.value(); - ver = getRes.version(); - } - } - else { - v = entry.innerGet( null, - null, - /*read-through*/false, - /*update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiry, - !deserializeBinary); - } - - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - GridCacheVersion obsoleteVer = context().versions().next(); - - if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) - removeEntry(entry); - - success = false; + row.version(), + 0, + 0, + needVer); + + if (evt) { + ctx.events().readEvent(key, + null, + row.value(), + subjId, + taskName, + !deserializeBinary); + } } else - ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true, - getRes, ver, 0, 0, needVer); + success = false; } else success = false; - - break; // While. - } - catch (GridCacheEntryRemovedException ignored) { - // No-op, retry. } - catch (GridDhtInvalidPartitionException ignored) { - success = false; + else { + GridCacheEntryEx entry = null; + + while (true) { + try { + entry = entryEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + EntryGetResult getRes = null; + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + getRes = entry.innerGetVersioned( + null, + null, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiry, + true, + null); + + if (getRes != null) { + v = getRes.value(); + ver = getRes.version(); + } + } + else { + v = entry.innerGet( + null, + null, + /*read-through*/false, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiry, + !deserializeBinary); + } + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + if (isNew && entry.markObsoleteIfEmpty(context().versions().next())) + removeEntry(entry); + + success = false; + } + else { + ctx.addResult(locVals, + key, + v, + skipVals, + false, + deserializeBinary, + true, + getRes, + ver, + 0, + 0, + needVer); + } + } + else + success = false; - break; // While. - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - finally { - if (entry != null) - ctx.evicts().touch(entry, topVer); + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, retry. + } + catch (GridDhtInvalidPartitionException ignored) { + success = false; + + break; // While. + } + finally { + if (entry != null) + ctx.evicts().touch(entry, topVer); + } + } } - } - if (!success) - break; - else if (!skipVals && ctx.config().isStatisticsEnabled()) - metrics0().onRead(true); - } + if (!success) + break; + else if (!skipVals && ctx.config().isStatisticsEnabled()) + metrics0().onRead(true); + } - if (success) { - sendTtlUpdateRequest(expiry); + if (success) { + sendTtlUpdateRequest(expiry); - return new GridFinishedFuture<>(locVals); + return new GridFinishedFuture<>(locVals); + } + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 2292cb2..4163376 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest; @@ -458,118 +459,161 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte expiryPlc = expiryPolicy(null); // Optimisation: try to resolve value locally and escape 'get future' creation. - if (!forcePrimary) { - Map<K, V> locVals = null; + if (!forcePrimary && ctx.affinityNode()) { + try { + Map<K, V> locVals = null; - boolean success = true; + boolean success = true; + boolean readNoEntry = ctx.readNoEntry(expiryPlc, false); + boolean evt = !skipVals; - // Optimistically expect that all keys are available locally (avoid creation of get future). - for (KeyCacheObject key : keys) { - GridCacheEntryEx entry = null; - - while (true) { - try { - entry = entryEx(key); - - // If our DHT cache do has value, then we peek it. - if (entry != null) { - boolean isNew = entry.isNewLocked(); + for (KeyCacheObject key : keys) { + if (readNoEntry) { + CacheDataRow row = ctx.offheap().read(key); - EntryGetResult getRes = null; - CacheObject v = null; - GridCacheVersion ver = null; + if (row != null) { + long expireTime = row.expireTime(); - if (needVer) { - getRes = entry.innerGetVersioned( - null, - null, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary, - null); - - if (getRes != null) { - v = getRes.value(); - ver = getRes.version(); - } - } - else { - v = entry.innerGet( - null, - null, - /*read-through*/false, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiryPlc, - !deserializeBinary); - } - - // Entry was not in memory or in swap, so we remove it from cache. - if (v == null) { - GridCacheVersion obsoleteVer = context().versions().next(); - - if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) - removeEntry(entry); - - success = false; - } - else { + if (expireTime == 0 || expireTime > U.currentTimeMillis()) { if (locVals == null) locVals = U.newHashMap(keys.size()); ctx.addResult(locVals, key, - v, + row.value(), skipVals, - keepCacheObj, + false, deserializeBinary, true, - getRes, - ver, + null, + row.version(), 0, 0, needVer); + + if (evt) { + ctx.events().readEvent(key, + null, + row.value(), + subjId, + taskName, + !deserializeBinary); + } } + else + success = false; } else success = false; - - break; // While. } - catch (GridCacheEntryRemovedException ignored) { - // No-op, retry. - } - catch (GridDhtInvalidPartitionException ignored) { - success = false; + else { + GridCacheEntryEx entry = null; + + while (true) { + try { + entry = entryEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + EntryGetResult getRes = null; + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + getRes = entry.innerGetVersioned( + null, + null, + /*update-metrics*/false, + /*event*/!skipVals, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary, + null); + + if (getRes != null) { + v = getRes.value(); + ver = getRes.version(); + } + } + else { + v = entry.innerGet( + null, + null, + /*read-through*/false, + /*update-metrics*/false, + /*event*/!skipVals, + subjId, + null, + taskName, + expiryPlc, + !deserializeBinary); + } + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + GridCacheVersion obsoleteVer = context().versions().next(); + + if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) + removeEntry(entry); + + success = false; + } + else { + if (locVals == null) + locVals = U.newHashMap(keys.size()); + + ctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObj, + deserializeBinary, + true, + getRes, + ver, + 0, + 0, + needVer); + } + } + else + success = false; - break; // While. - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - finally { - if (entry != null) - context().evicts().touch(entry, topVer); + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, retry. + } + catch (GridDhtInvalidPartitionException ignored) { + success = false; + + break; // While. + } + finally { + if (entry != null) + context().evicts().touch(entry, topVer); + } + } + + if (!success) + break; + else if (!skipVals && ctx.config().isStatisticsEnabled()) + ctx.cache().metrics0().onRead(true); } } - if (!success) - break; - else if (!skipVals && ctx.config().isStatisticsEnabled()) - ctx.cache().metrics0().onRead(true); - } - - if (success) { - sendTtlUpdateRequest(expiryPlc); + if (success) { + sendTtlUpdateRequest(expiryPlc); - return new GridFinishedFuture<>(locVals); + return new GridFinishedFuture<>(locVals); + } + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java index e1d4484..56041ee 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java @@ -55,6 +55,7 @@ import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.database.CacheDataRow; import org.apache.ignite.internal.processors.cache.local.GridLocalCache; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -384,7 +385,7 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { UUID subjId = ctx.subjectIdPerCall(null, opCtx); - Map<K, V> vals = new HashMap<>(keys.size(), 1.0f); + Map<K, V> vals = U.newHashMap(keys.size()); if (keyCheck) validateCacheKeys(keys); @@ -392,97 +393,142 @@ public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry() : null); boolean success = true; + boolean readNoEntry = ctx.readNoEntry(expiry, false); + final boolean evt = !skipVals; for (K key : keys) { if (key == null) throw new NullPointerException("Null key."); - GridCacheEntryEx entry = null; - KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); - while (true) { - try { - entry = entryEx(cacheKey); + boolean skipEntry = readNoEntry; - if (entry != null) { - CacheObject v; + if (readNoEntry) { + CacheDataRow row = ctx.offheap().read(cacheKey); - if (needVer) { - EntryGetResult res = entry.innerGetVersioned( - null, - null, - /**update-metrics*/false, - /*event*/!skipVals, - subjId, - null, - taskName, - expiry, - !deserializeBinary, - null); - - if (res != null) { - ctx.addResult( - vals, - cacheKey, - res, - skipVals, - false, - deserializeBinary, - true, - needVer); - } - else - success = false; - } - else { - v = entry.innerGet( - null, + if (row != null) { + long expireTime = row.expireTime(); + + if (expireTime == 0 || expireTime > U.currentTimeMillis()) { + ctx.addResult(vals, + cacheKey, + row.value(), + skipVals, + false, + deserializeBinary, + true, + null, + row.version(), + 0, + 0, + needVer); + + if (configuration().isStatisticsEnabled() && !skipVals) + metrics0().onRead(true); + + if (evt) { + ctx.events().readEvent(cacheKey, null, - /*read-through*/false, - /**update-metrics*/true, - /**event*/!skipVals, + row.value(), subjId, - null, taskName, - expiry, !deserializeBinary); + } + } + else + skipEntry = false; + } + else + success = false; + } - if (v != null) { - ctx.addResult(vals, - cacheKey, - v, - skipVals, - false, - deserializeBinary, - true, + if (!skipEntry) { + GridCacheEntryEx entry = null; + + while (true) { + try { + entry = entryEx(cacheKey); + + if (entry != null) { + CacheObject v; + + if (needVer) { + EntryGetResult res = entry.innerGetVersioned( null, - 0, - 0); + null, + /*update-metrics*/false, + /*event*/evt, + subjId, + null, + taskName, + expiry, + !deserializeBinary, + null); + + if (res != null) { + ctx.addResult( + vals, + cacheKey, + res, + skipVals, + false, + deserializeBinary, + true, + needVer); + } + else + success = false; + } + else { + v = entry.innerGet( + null, + null, + /*read-through*/false, + /*update-metrics*/true, + /*event*/evt, + subjId, + null, + taskName, + expiry, + !deserializeBinary); + + if (v != null) { + ctx.addResult(vals, + cacheKey, + v, + skipVals, + false, + deserializeBinary, + true, + null, + 0, + 0); + } + else + success = false; } - else - success = false; } - } - else { - if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals) - metrics0().onRead(false); + else { + if (!storeEnabled && configuration().isStatisticsEnabled() && !skipVals) + metrics0().onRead(false); - success = false; + success = false; + } + + break; // While. + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, retry. + } + finally { + if (entry != null) + ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion()); } - break; // While. - } - catch (GridCacheEntryRemovedException ignored) { - // No-op, retry. + if (!success && storeEnabled) + break; } - finally { - if (entry != null) - ctx.evicts().touch(entry, ctx.affinity().affinityTopologyVersion()); - } - - if (!success && storeEnabled) - break; } } http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java new file mode 100644 index 0000000..4e2c534 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java @@ -0,0 +1,394 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import javax.cache.expiry.Duration; +import javax.cache.expiry.ModifiedExpiryPolicy; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.CacheAtomicityMode; +import org.apache.ignite.cache.CacheEntry; +import org.apache.ignite.cache.CacheEntryProcessor; +import org.apache.ignite.configuration.CacheConfiguration; +import org.apache.ignite.configuration.IgniteConfiguration; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteCallable; +import org.apache.ignite.resources.IgniteInstanceResource; +import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi; +import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder; +import org.apache.ignite.testframework.GridTestUtils; +import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; + +import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC; +import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; +import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; + +/** + * + */ +@SuppressWarnings("unchecked") +public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest { + /** IP finder. */ + private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static volatile CountDownLatch processorStartLatch; + + /** */ + private static volatile CountDownLatch hangLatch; + + /** */ + private boolean client; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER); + + cfg.setClientMode(client); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void beforeTestsStarted() throws Exception { + super.beforeTestsStarted(); + + startGrid(0); + + client = true; + + startGrid(1); + } + + /** {@inheritDoc} */ + @Override protected void afterTestsStopped() throws Exception { + stopAllGrids(); + + super.afterTestsStopped(); + } + + /** + * @throws Exception If failed. + */ + public void testAtomicGet() throws Exception { + getTest(ATOMIC); + } + + /** + * @throws Exception If failed. + */ + public void testTxGet() throws Exception { + getTest(TRANSACTIONAL); + } + + /** + * @param atomicityMode Cache atomicity mode. + * @throws Exception If failed. + */ + private void getTest(CacheAtomicityMode atomicityMode) throws Exception { + boolean getAll[] = {true, false}; + boolean cfgExpiryPlc[] = {true, false}; + boolean withExpiryPlc[] = {true, false}; + boolean heapCache[] = {true, false}; + for (boolean getAll0 : getAll) { + for (boolean expiryPlc0 : cfgExpiryPlc) { + for (boolean withExpiryPlc0 : withExpiryPlc) { + for (boolean heapCache0 : heapCache) + doGet(atomicityMode, heapCache0, getAll0, expiryPlc0, withExpiryPlc0); + } + } + } + } + + /** + * @param atomicityMode Cache atomicity mode. + * @param heapCache Heap cache flag. + * @param getAll Test getAll flag. + * @param cfgExpiryPlc Configured expiry policy flag. + * @param withExpiryPlc Custom expiry policy flag. + * @throws Exception If failed. + */ + private void doGet(CacheAtomicityMode atomicityMode, + boolean heapCache, + final boolean getAll, + final boolean cfgExpiryPlc, + final boolean withExpiryPlc) throws Exception { + log.info("Test get [getAll=" + getAll + ", cfgExpiryPlc=" + cfgExpiryPlc + ']'); + + Ignite srv = ignite(0); + + Ignite client = ignite(1); + + final IgniteCache cache = client.createCache(cacheConfiguration(atomicityMode, heapCache, cfgExpiryPlc)); + + final Map<Object, Object> data = new HashMap<>(); + + data.put(1, 1); + data.put(2, 2); + + try { + // Get from compute closure. + { + cache.putAll(data); + + hangLatch = new CountDownLatch(1); + processorStartLatch = new CountDownLatch(1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + if (getAll) + cache.invokeAll(data.keySet(), new HangEntryProcessor()); + else + cache.invoke(1, new HangEntryProcessor()); + + return null; + } + }); + + try { + boolean wait = processorStartLatch.await(30, TimeUnit.SECONDS); + + assertTrue(wait); + + if (getAll) { + assertEquals(data, client.compute().affinityCall(cache.getName(), 1, + new GetAllClosure(data.keySet(), cache.getName(), withExpiryPlc))); + } + else { + assertEquals(1, client.compute().affinityCall(cache.getName(), 1, + new GetClosure(1, cache.getName(), withExpiryPlc))); + } + + hangLatch.countDown(); + + fut.get(); + } + finally { + hangLatch.countDown(); + } + } + + // Local get. + { + cache.putAll(data); + + hangLatch = new CountDownLatch(1); + processorStartLatch = new CountDownLatch(1); + + IgniteInternalFuture fut = GridTestUtils.runAsync(new Callable<Void>() { + @Override public Void call() throws Exception { + if (getAll) + cache.invokeAll(data.keySet(), new HangEntryProcessor()); + else + cache.invoke(1, new HangEntryProcessor()); + + return null; + } + }); + + try { + boolean wait = processorStartLatch.await(30, TimeUnit.SECONDS); + + assertTrue(wait); + + IgniteCache srvCache = srv.cache(cache.getName()); + + if (withExpiryPlc) + srvCache = srvCache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create()); + + if (getAll) { + assertEquals(data, srvCache.getAll(data.keySet())); + assertEquals(data.size(), srvCache.getEntries(data.keySet()).size()); + } + else { + assertEquals(1, srvCache.get(1)); + assertEquals(1, srvCache.getEntry(1).getValue()); + } + + hangLatch.countDown(); + + fut.get(); + } + finally { + hangLatch.countDown(); + } + } + } + finally { + client.destroyCache(cache.getName()); + } + } + + /** + * @param atomicityMode Atomicity mode. + * @param heapCache Heap cache flag. + * @param expiryPlc Expiry policy flag. + * @return Cache configuration. + */ + private CacheConfiguration cacheConfiguration(CacheAtomicityMode atomicityMode, + boolean heapCache, + boolean expiryPlc) { + CacheConfiguration ccfg = new CacheConfiguration(); + + ccfg.setAtomicityMode(atomicityMode); + ccfg.setOnheapCacheEnabled(heapCache); + ccfg.setWriteSynchronizationMode(FULL_SYNC); + ccfg.setName("testCache"); + + if (expiryPlc) + ccfg.setExpiryPolicyFactory(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES)); + + return ccfg; + } + + /** + * + */ + static class HangEntryProcessor implements CacheEntryProcessor { + /** {@inheritDoc} */ + @Override public Object process(MutableEntry entry, Object... arguments) { + assert processorStartLatch != null; + assert hangLatch != null; + + try { + processorStartLatch.countDown(); + + if (!hangLatch.await(60, TimeUnit.SECONDS)) + throw new RuntimeException("Failed to wait for latch"); + } + catch (Exception e) { + System.out.println("Unexpected error: " + e); + + throw new EntryProcessorException(e); + } + + entry.setValue(U.currentTimeMillis()); + + return null; + } + } + + /** + * + */ + public static class GetClosure implements IgniteCallable<Object> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private final int key; + + /** */ + private final String cacheName; + + /** */ + private final boolean withExpiryPlc; + + /** + * @param key Key. + * @param cacheName Cache name. + * @param withExpiryPlc Custom expiry policy flag. + */ + GetClosure(int key, String cacheName, boolean withExpiryPlc) { + this.key = key; + this.cacheName = cacheName; + this.withExpiryPlc = withExpiryPlc; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + IgniteCache cache = ignite.cache(cacheName); + + if (withExpiryPlc) + cache = cache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create()); + + Object val = cache.get(key); + + CacheEntry e = cache.getEntry(key); + + assertEquals(val, e.getValue()); + + return val; + } + } + + /** + * + */ + public static class GetAllClosure implements IgniteCallable<Object> { + /** */ + @IgniteInstanceResource + private Ignite ignite; + + /** */ + private final Set<Object> keys; + + /** */ + private final String cacheName; + + /** */ + private final boolean withExpiryPlc; + + /** + * @param keys Keys. + * @param cacheName Cache name. + * @param withExpiryPlc Custom expiry policy flag. + */ + GetAllClosure(Set<Object> keys, String cacheName, boolean withExpiryPlc) { + this.keys = keys; + this.cacheName = cacheName; + this.withExpiryPlc = withExpiryPlc; + } + + /** {@inheritDoc} */ + @Override public Object call() throws Exception { + IgniteCache cache = ignite.cache(cacheName); + + if (withExpiryPlc) + cache = cache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create()); + + Map vals = cache.getAll(keys); + + Collection<CacheEntry> entries = cache.getEntries(keys); + + assertEquals(vals.size(), entries.size()); + + for (CacheEntry entry : entries) { + Object val = vals.get(entry.getKey()); + + assertEquals(val, entry.getValue()); + } + + return vals; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java index 5d01716..5be7e07 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java @@ -146,7 +146,8 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs assertEquals(0, pSize); } - /** * @throws Exception If failed. + /** + * @throws Exception If failed. */ public void testZeroOnCreate() throws Exception { factory = CreatedExpiryPolicy.factoryOf(Duration.ZERO); @@ -1012,7 +1013,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest extends IgniteCacheAbs if(cacheMode() != PARTITIONED) return; - factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS,1)); + factory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 2)); nearCache = true; http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java index 89e8f01..4ac3b3b 100644 --- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java +++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java @@ -40,6 +40,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheVariableTopologySelf import org.apache.ignite.internal.processors.cache.IgniteAtomicCacheEntryProcessorNodeJoinTest; import org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorNodeJoinTest; import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest; +import org.apache.ignite.internal.processors.cache.IgniteCacheNoSyncForGetTest; import org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest; import org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop; import org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest; @@ -265,6 +266,8 @@ public class IgniteCacheTestSuite2 extends TestSuite { suite.addTest(new TestSuite(IgniteOnePhaseCommitInvokeTest.class)); + suite.addTest(new TestSuite(IgniteCacheNoSyncForGetTest.class)); + return suite; } }
