ignite-5009

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/52aa0ab4
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/52aa0ab4
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/52aa0ab4

Branch: refs/heads/ignite-5009
Commit: 52aa0ab402b66931ee55737acca21ce7eb721041
Parents: 1214d7e
Author: sboikov <[email protected]>
Authored: Mon Apr 24 15:23:24 2017 +0300
Committer: sboikov <[email protected]>
Committed: Mon Apr 24 15:56:05 2017 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 134 +++++--
 .../processors/cache/GridCacheContext.java      |  13 +
 .../processors/cache/GridCacheEventManager.java |  32 ++
 .../cache/IgniteCacheExpiryPolicy.java          |   5 +
 .../cache/IgniteCacheOffheapManager.java        |   7 +
 .../cache/IgniteCacheOffheapManagerImpl.java    |  10 +
 .../dht/GridPartitionedGetFuture.java           | 158 +++++---
 .../dht/GridPartitionedSingleGetFuture.java     | 141 ++++---
 .../dht/atomic/GridDhtAtomicCache.java          | 220 +++++++----
 .../dht/colocated/GridDhtColocatedCache.java    | 218 ++++++----
 .../local/atomic/GridLocalAtomicCache.java      | 188 +++++----
 .../cache/IgniteCacheNoSyncForGetTest.java      | 394 +++++++++++++++++++
 .../IgniteCacheExpiryPolicyAbstractTest.java    |   5 +-
 .../testsuites/IgniteCacheTestSuite2.java       |   3 +
 .../cache/IgniteGetFromComputeBenchmark.java    | 167 ++++++++
 15 files changed, 1295 insertions(+), 400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index c9f7430..d630a2b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1967,58 +1967,104 @@ public abstract class GridCacheAdapter<K, V> 
implements IgniteInternalCache<K, V
 
                 final boolean storeEnabled = !skipVals && readThrough && 
ctx.readThrough();
 
+                boolean readNoEntry = ctx.readNoEntry(expiry, false);
+
                 for (KeyCacheObject key : keys) {
                     while (true) {
-                        GridCacheEntryEx entry = entryEx(key);
-
-                        if (entry == null) {
-                            if (!skipVals && 
ctx.config().isStatisticsEnabled())
-                                ctx.cache().metrics0().onRead(false);
-
-                            break;
-                        }
-
                         try {
-                            EntryGetResult res;
+                            EntryGetResult res = null;
 
                             boolean evt = !skipVals;
                             boolean updateMetrics = !skipVals;
 
-                            if (storeEnabled) {
-                                res = 
entry.innerGetAndReserveForLoad(updateMetrics,
-                                    evt,
-                                    subjId,
-                                    taskName,
-                                    expiry,
-                                    !deserializeBinary,
-                                    readerArgs);
+                            GridCacheEntryEx entry = null;
 
-                                assert res != null;
+                            boolean skipEntry = readNoEntry;
 
-                                if (res.value() == null) {
-                                    if (misses == null)
-                                        misses = new HashMap<>();
+                            if (readNoEntry) {
+                                CacheDataRow row = ctx.offheap().read(key);
 
-                                    misses.put(key, res);
+                                if (row != null) {
+                                    long expireTime = row.expireTime();
 
-                                    res = null;
+                                    if (expireTime != 0) {
+                                        if (expireTime  >  
U.currentTimeMillis()) {
+                                            res = new 
EntryGetWithTtlResult(row.value(),
+                                                row.version(),
+                                                false,
+                                                expireTime,
+                                                0);
+                                        }
+                                        else
+                                            skipEntry = false;
+                                    }
+                                    else
+                                        res = new EntryGetResult(row.value(), 
row.version(), false);
                                 }
+
+                                if (res != null) {
+                                    if (evt) {
+                                        ctx.events().readEvent(key,
+                                            null,
+                                            row.value(),
+                                            subjId,
+                                            taskName,
+                                            !deserializeBinary);
+                                    }
+
+                                    if (updateMetrics && 
ctx.cache().configuration().isStatisticsEnabled())
+                                        ctx.cache().metrics0().onRead(true);
+                                }
+                                else if (storeEnabled)
+                                    skipEntry = false;
                             }
-                            else {
-                                res = entry.innerGetVersioned(
-                                    null,
-                                    null,
-                                    updateMetrics,
-                                    evt,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiry,
-                                    !deserializeBinary,
-                                    readerArgs);
-
-                                if (res == null)
-                                    ctx.evicts().touch(entry, topVer);
+
+                            if (!skipEntry) {
+                                entry = entryEx(key);
+
+                                if (entry == null) {
+                                    if (!skipVals && 
ctx.config().isStatisticsEnabled())
+                                        ctx.cache().metrics0().onRead(false);
+
+                                    break;
+                                }
+
+                                if (storeEnabled) {
+                                    res = 
entry.innerGetAndReserveForLoad(updateMetrics,
+                                        evt,
+                                        subjId,
+                                        taskName,
+                                        expiry,
+                                        !deserializeBinary,
+                                        readerArgs);
+
+                                    assert res != null;
+
+                                    if (res.value() == null) {
+                                        if (misses == null)
+                                            misses = new HashMap<>();
+
+                                        misses.put(key, res);
+
+                                        res = null;
+                                    }
+                                }
+                                else {
+                                    res = entry.innerGetVersioned(
+                                        null,
+                                        null,
+                                        updateMetrics,
+                                        evt,
+                                        subjId,
+                                        null,
+                                        taskName,
+                                        expiry,
+                                        !deserializeBinary,
+                                        readerArgs);
+
+                                    if (res == null)
+                                        ctx.evicts().touch(entry, topVer);
+                                }
                             }
 
                             if (res != null) {
@@ -2031,7 +2077,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                                     true,
                                     needVer);
 
-                                if (tx == null || (!tx.implicit() && 
tx.isolation() == READ_COMMITTED))
+                                if (entry != null && (tx == null || 
(!tx.implicit() && tx.isolation() == READ_COMMITTED)))
                                     ctx.evicts().touch(entry, topVer);
 
                                 if (keysSize == 1)
@@ -5584,6 +5630,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                     return CU.toTtl(expiryPlc.getExpiryForAccess());
                 }
 
+                @Override public boolean hasAccessTtl() {
+                    return CU.toTtl(expiryPlc.getExpiryForAccess()) != 
CU.TTL_NOT_CHANGED;
+                }
+
                 @Override public long forCreate() {
                     return CU.toTtl(expiryPlc.getExpiryForCreation());
                 }
@@ -5612,6 +5662,10 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                     return accessTtl;
                 }
 
+                @Override public boolean hasAccessTtl() {
+                    return accessTtl != CU.TTL_NOT_CHANGED;
+                }
+
                 /** {@inheritDoc} */
                 @Override public long forUpdate() {
                     return CU.TTL_NOT_CHANGED;

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 92c144c..921cdb6 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -2035,6 +2035,19 @@ public class GridCacheContext<K, V> implements 
Externalizable {
     }
 
     /**
+     * Checks if it is possible to directly read data memory without entry 
creation (this
+     * is optimization to avoid unnecessary blocking synchronization on cache 
entry).
+     *
+     * @param expiryPlc Expiry policy for read operation.
+     * @param readers {@code True} if need update entry readers.
+     * @return {@code True} if it is possible directly read offheap instead of 
using {@link GridCacheEntryEx#innerGet}.
+     */
+    public boolean readNoEntry(IgniteCacheExpiryPolicy expiryPlc, boolean 
readers) {
+        return (expiryPlc == null || !expiryPlc.hasAccessTtl()) &&
+            !readers;
+    }
+
+    /**
      * @return {@code True} if fast eviction is allowed.
      */
     public boolean allowFastEviction() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
index be5b539..687b132 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheEventManager.java
@@ -31,6 +31,7 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteUuid;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ;
 import static 
org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_STARTED;
 import static org.apache.ignite.events.EventType.EVT_CACHE_STOPPED;
@@ -62,6 +63,37 @@ public class GridCacheEventManager extends 
GridCacheManagerAdapter {
     }
 
     /**
+     * @param key Key for event.
+     * @param tx Possible surrounding transaction.
+     * @param val Read value.
+     * @param subjId Subject ID.
+     * @param taskName Task name.
+     * @param keepBinary Keep binary flag.
+     */
+    public void readEvent(KeyCacheObject key,
+        @Nullable IgniteInternalTx tx,
+        @Nullable CacheObject val,
+        @Nullable UUID subjId,
+        @Nullable String taskName,
+        boolean keepBinary) {
+        if (isRecordable(EVT_CACHE_OBJECT_READ)) {
+            addEvent(cctx.affinity().partition(key),
+                key,
+                tx,
+                null,
+                EVT_CACHE_OBJECT_READ,
+                val,
+                val != null,
+                val,
+                val != null,
+                subjId,
+                null,
+                taskName,
+                keepBinary);
+        }
+    }
+
+    /**
      * @param part Partition.
      * @param key Key for the event.
      * @param tx Possible surrounding transaction.

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
index f82c5f0..96f1c6f 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheExpiryPolicy.java
@@ -46,6 +46,11 @@ public interface IgniteCacheExpiryPolicy {
     public long forAccess();
 
     /**
+     * @return {@code True} if expiry policy change ttl on entry read.
+     */
+    public boolean hasAccessTtl();
+
+    /**
      * Callback for ttl update on entry access.
      *
      * @param key Entry key.

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
index 4a98f6a..6a43e4c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManager.java
@@ -71,6 +71,13 @@ public interface IgniteCacheOffheapManager extends 
GridCacheManager {
     @Nullable public CacheDataRow read(GridCacheMapEntry entry) throws 
IgniteCheckedException;
 
     /**
+     * @param key Key.
+     * @return Cached row, if available, null otherwise.
+     * @throws IgniteCheckedException If failed.
+     */
+    @Nullable public CacheDataRow read(KeyCacheObject key) throws 
IgniteCheckedException;
+
+    /**
      * @param p Partition.
      * @return Data store.
      * @throws IgniteCheckedException If failed.

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index 73edbe1..6ca7f91 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -362,6 +362,16 @@ public class IgniteCacheOffheapManagerImpl extends 
GridCacheManagerAdapter imple
     }
 
     /** {@inheritDoc} */
+    @Nullable @Override public CacheDataRow read(KeyCacheObject key) throws 
IgniteCheckedException {
+        if (cctx.isLocal())
+            return locCacheDataStore.find(key);
+
+        GridDhtLocalPartition part = 
cctx.topology().localPartition(cctx.affinity().partition(key), null, false);
+
+        return part != null ? dataStore(part).find(key) : null;
+    }
+
+    /** {@inheritDoc} */
     @Override public boolean containsKey(GridCacheMapEntry entry) {
         try {
             return read(entry) != null;

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 4dc4eb4..362432c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -41,6 +41,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedExceptio
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -445,75 +446,109 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
 
         GridDhtCacheAdapter<K, V> cache = cache();
 
+        boolean readNoEntry = cctx.readNoEntry(expiryPlc, false);
+        boolean evt = !skipVals;
+
         while (true) {
             try {
-                GridCacheEntryEx entry = cache.entryEx(key);
-
-                // If our DHT cache do has value, then we peek it.
-                if (entry != null) {
-                    boolean isNew = entry.isNewLocked();
-
-                    EntryGetResult getRes = null;
-                    CacheObject v = null;
-                    GridCacheVersion ver = null;
-
-                    if (needVer) {
-                        getRes = entry.innerGetVersioned(
-                            null,
-                            null,
-                            /**update-metrics*/false,
-                            /*event*/!skipVals,
-                            subjId,
-                            null,
-                            taskName,
-                            expiryPlc,
-                            !deserializeBinary,
-                            null);
-
-                        if (getRes != null) {
-                            v = getRes.value();
-                            ver = getRes.version();
+                boolean skipEntry = readNoEntry;
+
+                EntryGetResult getRes = null;
+                CacheObject v = null;
+                GridCacheVersion ver = null;
+
+                if (readNoEntry) {
+                    CacheDataRow row = cctx.offheap().read(key);
+
+                    if (row != null) {
+                        long expireTime = row.expireTime();
+
+                        if (expireTime == 0 || expireTime > 
U.currentTimeMillis()) {
+                            v = row.value();
+
+                            if (needVer)
+                                ver = row.version();
+
+                            if (evt) {
+                                cctx.events().readEvent(key,
+                                    null,
+                                    row.value(),
+                                    subjId,
+                                    taskName,
+                                    !deserializeBinary);
+                            }
                         }
+                        else
+                            skipEntry = false;
                     }
-                    else {
-                        v = entry.innerGet(
-                            null,
-                            null,
-                            /*read-through*/false,
-                            /**update-metrics*/false,
-                            /*event*/!skipVals,
-                            subjId,
-                            null,
-                            taskName,
-                            expiryPlc,
-                            !deserializeBinary);
-                    }
+                }
 
-                    cache.context().evicts().touch(entry, topVer);
+                if (!skipEntry) {
+                    GridCacheEntryEx entry = cache.entryEx(key);
+
+                    // If our DHT cache do has value, then we peek it.
+                    if (entry != null) {
+                        boolean isNew = entry.isNewLocked();
+
+                        if (needVer) {
+                            getRes = entry.innerGetVersioned(
+                                null,
+                                null,
+                                /*update-metrics*/false,
+                                /*event*/evt,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc,
+                                !deserializeBinary,
+                                null);
+
+                            if (getRes != null) {
+                                v = getRes.value();
+                                ver = getRes.version();
+                            }
+                        }
+                        else {
+                            v = entry.innerGet(
+                                null,
+                                null,
+                                /*read-through*/false,
+                                /*update-metrics*/false,
+                                /*event*/evt,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc,
+                                !deserializeBinary);
+                        }
 
-                    // Entry was not in memory or in swap, so we remove it 
from cache.
-                    if (v == null) {
-                        if (isNew && entry.markObsoleteIfEmpty(ver))
-                            cache.removeEntry(entry);
-                    }
-                    else {
-                        cctx.addResult(locVals,
-                            key,
-                            v,
-                            skipVals,
-                            keepCacheObjects,
-                            deserializeBinary,
-                            true,
-                            getRes,
-                            ver,
-                            0,
-                            0,
-                            needVer);
-
-                        return true;
+                        cache.context().evicts().touch(entry, topVer);
+
+                        // Entry was not in memory or in swap, so we remove it 
from cache.
+                        if (v == null) {
+                            if (isNew && entry.markObsoleteIfEmpty(ver))
+                                cache.removeEntry(entry);
+                        }
                     }
                 }
 
+                if (v != null) {
+                    cctx.addResult(locVals,
+                        key,
+                        v,
+                        skipVals,
+                        keepCacheObjects,
+                        deserializeBinary,
+                        true,
+                        getRes,
+                        ver,
+                        0,
+                        0,
+                        needVer);
+
+                    return true;
+                }
+
                 boolean topStable = cctx.isReplicated() || 
topVer.equals(cctx.topology().topologyVersion());
 
                 // Entry not found, do not continue search if topology did not 
change and there is no store.
@@ -604,9 +639,6 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
      */
     private class MiniFuture extends GridFutureAdapter<Map<K, V>> {
         /** */
-        private static final long serialVersionUID = 0L;
-
-        /** */
         private final IgniteUuid futId = IgniteUuid.randomUuid();
 
         /** Node ID. */

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index dbf1fe1..63ed9a8 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -41,6 +41,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheFutureAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheMessage;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest;
@@ -288,7 +289,7 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
                 expiryPlc != null ? expiryPlc.forCreate() : -1L,
                 expiryPlc != null ? expiryPlc.forAccess() : -1L,
                 skipVals,
-                /**add reader*/false,
+                /*add reader*/false,
                 needVer,
                 cctx.deploymentEnabled(),
                 recovery);
@@ -347,67 +348,101 @@ public class GridPartitionedSingleGetFuture extends 
GridCacheFutureAdapter<Objec
 
         GridDhtCacheAdapter colocated = cctx.dht();
 
+        boolean readNoEntry = cctx.readNoEntry(expiryPlc, false);
+        boolean evt = !skipVals;
+
         while (true) {
             try {
-                GridCacheEntryEx entry = colocated.entryEx(key);
-
-                // If our DHT cache do has value, then we peek it.
-                if (entry != null) {
-                    boolean isNew = entry.isNewLocked();
-
-                    CacheObject v = null;
-                    GridCacheVersion ver = null;
-
-                    if (needVer) {
-                        EntryGetResult res = entry.innerGetVersioned(
-                            null,
-                            null,
-                            /**update-metrics*/false,
-                            /*event*/!skipVals,
-                            subjId,
-                            null,
-                            taskName,
-                            expiryPlc,
-                            true,
-                            null);
-
-                        if (res != null) {
-                            v = res.value();
-                            ver = res.version();
+                CacheObject v = null;
+                GridCacheVersion ver = null;
+
+                boolean skipEntry = readNoEntry;
+
+                if (readNoEntry) {
+                    CacheDataRow row = cctx.offheap().read(key);
+
+                    if (row != null) {
+                        long expireTime = row.expireTime();
+
+                        if (expireTime == 0 || expireTime > 
U.currentTimeMillis()) {
+                            v = row.value();
+
+                            if (needVer)
+                                ver = row.version();
+
+                            if (evt) {
+                                cctx.events().readEvent(key,
+                                    null,
+                                    row.value(),
+                                    subjId,
+                                    taskName,
+                                    !deserializeBinary);
+                            }
                         }
+                        else
+                            skipEntry = false;
                     }
-                    else {
-                        v = entry.innerGet(
-                            null,
-                            null,
-                            /*read-through*/false,
-                            /**update-metrics*/false,
-                            /*event*/!skipVals,
-                            subjId,
-                            null,
-                            taskName,
-                            expiryPlc,
-                            true);
-                    }
+                }
 
-                    colocated.context().evicts().touch(entry, topVer);
+                if (!skipEntry) {
+                    GridCacheEntryEx entry = colocated.entryEx(key);
+
+                    // If our DHT cache do has value, then we peek it.
+                    if (entry != null) {
+                        boolean isNew = entry.isNewLocked();
+
+                        if (needVer) {
+                            EntryGetResult res = entry.innerGetVersioned(
+                                null,
+                                null,
+                                /*update-metrics*/false,
+                                /*event*/evt,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc,
+                                true,
+                                null);
+
+                            if (res != null) {
+                                v = res.value();
+                                ver = res.version();
+                            }
+                        }
+                        else {
+                            v = entry.innerGet(
+                                null,
+                                null,
+                                /*read-through*/false,
+                                /*update-metrics*/false,
+                                /*event*/evt,
+                                subjId,
+                                null,
+                                taskName,
+                                expiryPlc,
+                                true);
+                        }
+
+                        colocated.context().evicts().touch(entry, topVer);
 
-                    // Entry was not in memory or in swap, so we remove it 
from cache.
-                    if (v == null) {
-                        if (isNew && entry.markObsoleteIfEmpty(ver))
-                            colocated.removeEntry(entry);
+                        // Entry was not in memory or in swap, so we remove it 
from cache.
+                        if (v == null) {
+                            if (isNew && entry.markObsoleteIfEmpty(ver))
+                                colocated.removeEntry(entry);
+                        }
                     }
-                    else {
-                        if (!skipVals && cctx.config().isStatisticsEnabled())
-                            cctx.cache().metrics0().onRead(true);
+                }
 
-                        if (!skipVals)
-                            setResult(v, ver);
-                        else
-                            setSkipValueResult(true, ver);
+                if (v != null) {
+                    if (!skipVals && cctx.config().isStatisticsEnabled())
+                        cctx.cache().metrics0().onRead(true);
 
-                        return true;
-                    }
+                    if (!skipVals)
+                        setResult(v, ver);
+                    else
+                        setSkipValueResult(true, ver);
+
+                    return true;
                 }
 
                 boolean topStable = cctx.isReplicated() || 
topVer.equals(cctx.topology().topologyVersion());

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index e477592..1d32edd 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -61,6 +61,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
@@ -1473,105 +1474,156 @@ public class GridDhtAtomicCache<K, V> extends 
GridDhtCacheAdapter<K, V> {
 
         final IgniteCacheExpiryPolicy expiry = skipVals ? null : 
expiryPolicy(expiryPlc);
 
+        final boolean evt = !skipVals;
+
         // Optimisation: try to resolve value locally and escape 'get future' 
creation.
         if (!forcePrimary && ctx.affinityNode()) {
-            Map<K, V> locVals = U.newHashMap(keys.size());
-
-            boolean success = true;
-
-            // Optimistically expect that all keys are available locally 
(avoid creation of get future).
-            for (KeyCacheObject key : keys) {
-                GridCacheEntryEx entry = null;
-
-                while (true) {
-                    try {
-                        entry = entryEx(key);
-
-                        // If our DHT cache do has value, then we peek it.
-                        if (entry != null) {
-                            boolean isNew = entry.isNewLocked();
-
-                            EntryGetResult getRes = null;
-                            CacheObject v = null;
-                            GridCacheVersion ver = null;
-
-                            if (needVer) {
-                                getRes = entry.innerGetVersioned(
-                                    null,
-                                    null,
-                                    /*update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiry,
+            try {
+                Map<K, V> locVals = U.newHashMap(keys.size());
+
+                boolean success = true;
+                boolean readNoEntry = ctx.readNoEntry(expiry, false);
+
+                // Optimistically expect that all keys are available locally 
(avoid creation of get future).
+                for (KeyCacheObject key : keys) {
+                    if (readNoEntry) {
+                        CacheDataRow row = ctx.offheap().read(key);
+
+                        if (row != null) {
+                            long expireTime = row.expireTime();
+
+                            if (expireTime == 0 || expireTime > 
U.currentTimeMillis()) {
+                                ctx.addResult(locVals,
+                                    key,
+                                    row.value(),
+                                    skipVals,
+                                    false,
+                                    deserializeBinary,
                                     true,
-                                    null);
-
-                                if (getRes != null) {
-                                    v = getRes.value();
-                                    ver = getRes.version();
-                                }
-                            }
-                            else {
-                                v = entry.innerGet(
                                     null,
-                                    null,
-                                    /*read-through*/false,
-                                    /*update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiry,
-                                    !deserializeBinary);
-                            }
-
-                            // Entry was not in memory or in swap, so we 
remove it from cache.
-                            if (v == null) {
-                                GridCacheVersion obsoleteVer = 
context().versions().next();
-
-                                if (isNew && 
entry.markObsoleteIfEmpty(obsoleteVer))
-                                    removeEntry(entry);
-
-                                success = false;
+                                    row.version(),
+                                    0,
+                                    0,
+                                    needVer);
+
+                                if (evt) {
+                                    ctx.events().readEvent(key,
+                                        null,
+                                        row.value(),
+                                        subjId,
+                                        taskName,
+                                        !deserializeBinary);
+                                }
                             }
                             else
-                                ctx.addResult(locVals, key, v, skipVals, 
false, deserializeBinary, true,
-                                    getRes, ver, 0, 0, needVer);
+                                success = false;
                         }
                         else
                             success = false;
-
-                        break; // While.
-                    }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        // No-op, retry.
                     }
-                    catch (GridDhtInvalidPartitionException ignored) {
-                        success = false;
+                    else {
+                        GridCacheEntryEx entry = null;
+
+                        while (true) {
+                            try {
+                                entry = entryEx(key);
+
+                                // If our DHT cache do has value, then we peek 
it.
+                                if (entry != null) {
+                                    boolean isNew = entry.isNewLocked();
+
+                                    EntryGetResult getRes = null;
+                                    CacheObject v = null;
+                                    GridCacheVersion ver = null;
+
+                                    if (needVer) {
+                                        getRes = entry.innerGetVersioned(
+                                            null,
+                                            null,
+                                            /*update-metrics*/false,
+                                            /*event*/evt,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiry,
+                                            true,
+                                            null);
+
+                                        if (getRes != null) {
+                                            v = getRes.value();
+                                            ver = getRes.version();
+                                        }
+                                    }
+                                    else {
+                                        v = entry.innerGet(
+                                            null,
+                                            null,
+                                            /*read-through*/false,
+                                            /*update-metrics*/false,
+                                            /*event*/evt,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiry,
+                                            !deserializeBinary);
+                                    }
+
+                                    // Entry was not in memory or in swap, so 
we remove it from cache.
+                                    if (v == null) {
+                                        if (isNew && 
entry.markObsoleteIfEmpty(context().versions().next()))
+                                            removeEntry(entry);
+
+                                        success = false;
+                                    }
+                                    else {
+                                        ctx.addResult(locVals,
+                                            key,
+                                            v,
+                                            skipVals,
+                                            false,
+                                            deserializeBinary,
+                                            true,
+                                            getRes,
+                                            ver,
+                                            0,
+                                            0,
+                                            needVer);
+                                    }
+                                }
+                                else
+                                    success = false;
 
-                        break; // While.
-                    }
-                    catch (IgniteCheckedException e) {
-                        return new GridFinishedFuture<>(e);
-                    }
-                    finally {
-                        if (entry != null)
-                            ctx.evicts().touch(entry, topVer);
+                                break; // While.
+                            }
+                            catch (GridCacheEntryRemovedException ignored) {
+                                // No-op, retry.
+                            }
+                            catch (GridDhtInvalidPartitionException ignored) {
+                                success = false;
+
+                                break; // While.
+                            }
+                            finally {
+                                if (entry != null)
+                                    ctx.evicts().touch(entry, topVer);
+                            }
+                        }
                     }
-                }
 
-                if (!success)
-                    break;
-                else if (!skipVals && ctx.config().isStatisticsEnabled())
-                    metrics0().onRead(true);
-            }
+                    if (!success)
+                        break;
+                    else if (!skipVals && ctx.config().isStatisticsEnabled())
+                        metrics0().onRead(true);
+                }
 
-            if (success) {
-                sendTtlUpdateRequest(expiry);
+                if (success) {
+                    sendTtlUpdateRequest(expiry);
 
-                return new GridFinishedFuture<>(locVals);
+                    return new GridFinishedFuture<>(locVals);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                return new GridFinishedFuture<>(e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 2292cb2..4163376 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -44,6 +44,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest;
@@ -458,118 +459,161 @@ public class GridDhtColocatedCache<K, V> extends 
GridDhtTransactionalCacheAdapte
             expiryPlc = expiryPolicy(null);
 
         // Optimisation: try to resolve value locally and escape 'get future' 
creation.
-        if (!forcePrimary) {
-            Map<K, V> locVals = null;
+        if (!forcePrimary && ctx.affinityNode()) {
+            try {
+                Map<K, V> locVals = null;
 
-            boolean success = true;
+                boolean success = true;
+                boolean readNoEntry = ctx.readNoEntry(expiryPlc, false);
+                boolean evt = !skipVals;
 
-            // Optimistically expect that all keys are available locally 
(avoid creation of get future).
-            for (KeyCacheObject key : keys) {
-                GridCacheEntryEx entry = null;
-
-                while (true) {
-                    try {
-                        entry = entryEx(key);
-
-                        // If our DHT cache do has value, then we peek it.
-                        if (entry != null) {
-                            boolean isNew = entry.isNewLocked();
+                for (KeyCacheObject key : keys) {
+                    if (readNoEntry) {
+                        CacheDataRow row = ctx.offheap().read(key);
 
-                            EntryGetResult getRes = null;
-                            CacheObject v = null;
-                            GridCacheVersion ver = null;
+                        if (row != null) {
+                            long expireTime = row.expireTime();
 
-                            if (needVer) {
-                                getRes = entry.innerGetVersioned(
-                                    null,
-                                    null,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary,
-                                    null);
-
-                                if (getRes != null) {
-                                    v = getRes.value();
-                                    ver = getRes.version();
-                                }
-                            }
-                            else {
-                                v = entry.innerGet(
-                                    null,
-                                    null,
-                                    /*read-through*/false,
-                                    /**update-metrics*/false,
-                                    /*event*/!skipVals,
-                                    subjId,
-                                    null,
-                                    taskName,
-                                    expiryPlc,
-                                    !deserializeBinary);
-                            }
-
-                            // Entry was not in memory or in swap, so we 
remove it from cache.
-                            if (v == null) {
-                                GridCacheVersion obsoleteVer = 
context().versions().next();
-
-                                if (isNew && 
entry.markObsoleteIfEmpty(obsoleteVer))
-                                    removeEntry(entry);
-
-                                success = false;
-                            }
-                            else {
+                            if (expireTime == 0 || expireTime > 
U.currentTimeMillis()) {
                                 if (locVals == null)
                                     locVals = U.newHashMap(keys.size());
 
                                 ctx.addResult(locVals,
                                     key,
-                                    v,
+                                    row.value(),
                                     skipVals,
-                                    keepCacheObj,
+                                    false,
                                     deserializeBinary,
                                     true,
-                                    getRes,
-                                    ver,
+                                    null,
+                                    row.version(),
                                     0,
                                     0,
                                     needVer);
+
+                                if (evt) {
+                                    ctx.events().readEvent(key,
+                                        null,
+                                        row.value(),
+                                        subjId,
+                                        taskName,
+                                        !deserializeBinary);
+                                }
                             }
+                            else
+                                success = false;
                         }
                         else
                             success = false;
-
-                        break; // While.
                     }
-                    catch (GridCacheEntryRemovedException ignored) {
-                        // No-op, retry.
-                    }
-                    catch (GridDhtInvalidPartitionException ignored) {
-                        success = false;
+                    else {
+                        GridCacheEntryEx entry = null;
+
+                        while (true) {
+                            try {
+                                entry = entryEx(key);
+
+                                // If our DHT cache do has value, then we peek 
it.
+                                if (entry != null) {
+                                    boolean isNew = entry.isNewLocked();
+
+                                    EntryGetResult getRes = null;
+                                    CacheObject v = null;
+                                    GridCacheVersion ver = null;
+
+                                    if (needVer) {
+                                        getRes = entry.innerGetVersioned(
+                                            null,
+                                            null,
+                                            /*update-metrics*/false,
+                                            /*event*/!skipVals,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiryPlc,
+                                            !deserializeBinary,
+                                            null);
+
+                                        if (getRes != null) {
+                                            v = getRes.value();
+                                            ver = getRes.version();
+                                        }
+                                    }
+                                    else {
+                                        v = entry.innerGet(
+                                            null,
+                                            null,
+                                            /*read-through*/false,
+                                            /*update-metrics*/false,
+                                            /*event*/!skipVals,
+                                            subjId,
+                                            null,
+                                            taskName,
+                                            expiryPlc,
+                                            !deserializeBinary);
+                                    }
+
+                                    // Entry was not in memory or in swap, so 
we remove it from cache.
+                                    if (v == null) {
+                                        GridCacheVersion obsoleteVer = 
context().versions().next();
+
+                                        if (isNew && 
entry.markObsoleteIfEmpty(obsoleteVer))
+                                            removeEntry(entry);
+
+                                        success = false;
+                                    }
+                                    else {
+                                        if (locVals == null)
+                                            locVals = 
U.newHashMap(keys.size());
+
+                                        ctx.addResult(locVals,
+                                            key,
+                                            v,
+                                            skipVals,
+                                            keepCacheObj,
+                                            deserializeBinary,
+                                            true,
+                                            getRes,
+                                            ver,
+                                            0,
+                                            0,
+                                            needVer);
+                                    }
+                                }
+                                else
+                                    success = false;
 
-                        break; // While.
-                    }
-                    catch (IgniteCheckedException e) {
-                        return new GridFinishedFuture<>(e);
-                    }
-                    finally {
-                        if (entry != null)
-                            context().evicts().touch(entry, topVer);
+                                break; // While.
+                            }
+                            catch (GridCacheEntryRemovedException ignored) {
+                                // No-op, retry.
+                            }
+                            catch (GridDhtInvalidPartitionException ignored) {
+                                success = false;
+
+                                break; // While.
+                            }
+                            finally {
+                                if (entry != null)
+                                    context().evicts().touch(entry, topVer);
+                            }
+                        }
+
+                        if (!success)
+                            break;
+                        else if (!skipVals && 
ctx.config().isStatisticsEnabled())
+                            ctx.cache().metrics0().onRead(true);
                     }
                 }
 
-                if (!success)
-                    break;
-                else if (!skipVals && ctx.config().isStatisticsEnabled())
-                    ctx.cache().metrics0().onRead(true);
-            }
-
-            if (success) {
-                sendTtlUpdateRequest(expiryPlc);
+                if (success) {
+                    sendTtlUpdateRequest(expiryPlc);
 
-                return new GridFinishedFuture<>(locVals);
+                    return new GridFinishedFuture<>(locVals);
+                }
+            }
+            catch (IgniteCheckedException e) {
+                return new GridFinishedFuture<>(e);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index e1d4484..56041ee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -55,6 +55,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
 import org.apache.ignite.internal.processors.cache.GridCacheReturn;
 import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
 import org.apache.ignite.internal.processors.cache.KeyCacheObject;
+import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.local.GridLocalCache;
 import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
@@ -384,7 +385,7 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
 
         UUID subjId = ctx.subjectIdPerCall(null, opCtx);
 
-        Map<K, V> vals = new HashMap<>(keys.size(), 1.0f);
+        Map<K, V> vals = U.newHashMap(keys.size());
 
         if (keyCheck)
             validateCacheKeys(keys);
@@ -392,97 +393,142 @@ public class GridLocalAtomicCache<K, V> extends 
GridLocalCache<K, V> {
         final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? 
opCtx.expiry() : null);
 
         boolean success = true;
+        boolean readNoEntry = ctx.readNoEntry(expiry, false);
+        final boolean evt = !skipVals;
 
         for (K key : keys) {
             if (key == null)
                 throw new NullPointerException("Null key.");
 
-            GridCacheEntryEx entry = null;
-
             KeyCacheObject cacheKey = ctx.toCacheKeyObject(key);
 
-            while (true) {
-                try {
-                    entry = entryEx(cacheKey);
+            boolean skipEntry = readNoEntry;
 
-                    if (entry != null) {
-                        CacheObject v;
+            if (readNoEntry) {
+                CacheDataRow row = ctx.offheap().read(cacheKey);
 
-                        if (needVer) {
-                            EntryGetResult res = entry.innerGetVersioned(
-                                null,
-                                null,
-                                /**update-metrics*/false,
-                                /*event*/!skipVals,
-                                subjId,
-                                null,
-                                taskName,
-                                expiry,
-                                !deserializeBinary,
-                                null);
-
-                            if (res != null) {
-                                ctx.addResult(
-                                    vals,
-                                    cacheKey,
-                                    res,
-                                    skipVals,
-                                    false,
-                                    deserializeBinary,
-                                    true,
-                                    needVer);
-                            }
-                            else
-                                success = false;
-                        }
-                        else {
-                            v = entry.innerGet(
-                                null,
+                if (row != null) {
+                    long expireTime = row.expireTime();
+
+                    if (expireTime == 0 || expireTime > U.currentTimeMillis()) 
{
+                        ctx.addResult(vals,
+                            cacheKey,
+                            row.value(),
+                            skipVals,
+                            false,
+                            deserializeBinary,
+                            true,
+                            null,
+                            row.version(),
+                            0,
+                            0,
+                            needVer);
+
+                        if (configuration().isStatisticsEnabled() && !skipVals)
+                            metrics0().onRead(true);
+
+                        if (evt) {
+                            ctx.events().readEvent(cacheKey,
                                 null,
-                                /*read-through*/false,
-                                /**update-metrics*/true,
-                                /**event*/!skipVals,
+                                row.value(),
                                 subjId,
-                                null,
                                 taskName,
-                                expiry,
                                 !deserializeBinary);
+                        }
+                    }
+                    else
+                        skipEntry = false;
+                }
+                else
+                    success = false;
+            }
 
-                            if (v != null) {
-                                ctx.addResult(vals,
-                                    cacheKey,
-                                    v,
-                                    skipVals,
-                                    false,
-                                    deserializeBinary,
-                                    true,
+            if (!skipEntry) {
+                GridCacheEntryEx entry = null;
+
+                while (true) {
+                    try {
+                        entry = entryEx(cacheKey);
+
+                        if (entry != null) {
+                            CacheObject v;
+
+                            if (needVer) {
+                                EntryGetResult res = entry.innerGetVersioned(
                                     null,
-                                    0,
-                                    0);
+                                    null,
+                                    /*update-metrics*/false,
+                                    /*event*/evt,
+                                    subjId,
+                                    null,
+                                    taskName,
+                                    expiry,
+                                    !deserializeBinary,
+                                    null);
+
+                                if (res != null) {
+                                    ctx.addResult(
+                                        vals,
+                                        cacheKey,
+                                        res,
+                                        skipVals,
+                                        false,
+                                        deserializeBinary,
+                                        true,
+                                        needVer);
+                                }
+                                else
+                                    success = false;
+                            }
+                            else {
+                                v = entry.innerGet(
+                                    null,
+                                    null,
+                                    /*read-through*/false,
+                                    /*update-metrics*/true,
+                                    /*event*/evt,
+                                    subjId,
+                                    null,
+                                    taskName,
+                                    expiry,
+                                    !deserializeBinary);
+
+                                if (v != null) {
+                                    ctx.addResult(vals,
+                                        cacheKey,
+                                        v,
+                                        skipVals,
+                                        false,
+                                        deserializeBinary,
+                                        true,
+                                        null,
+                                        0,
+                                        0);
+                                }
+                                else
+                                    success = false;
                             }
-                            else
-                                success = false;
                         }
-                    }
-                    else {
-                        if (!storeEnabled && 
configuration().isStatisticsEnabled() && !skipVals)
-                            metrics0().onRead(false);
+                        else {
+                            if (!storeEnabled && 
configuration().isStatisticsEnabled() && !skipVals)
+                                metrics0().onRead(false);
 
-                        success = false;
+                            success = false;
+                        }
+
+                        break; // While.
+                    }
+                    catch (GridCacheEntryRemovedException ignored) {
+                        // No-op, retry.
+                    }
+                    finally {
+                        if (entry != null)
+                            ctx.evicts().touch(entry, 
ctx.affinity().affinityTopologyVersion());
                     }
 
-                    break; // While.
-                }
-                catch (GridCacheEntryRemovedException ignored) {
-                    // No-op, retry.
+                    if (!success && storeEnabled)
+                        break;
                 }
-                finally {
-                    if (entry != null)
-                        ctx.evicts().touch(entry, 
ctx.affinity().affinityTopologyVersion());
-                }
-
-                if (!success && storeEnabled)
-                    break;
             }
         }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
new file mode 100644
index 0000000..4e2c534
--- /dev/null
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheNoSyncForGetTest.java
@@ -0,0 +1,394 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import javax.cache.expiry.Duration;
+import javax.cache.expiry.ModifiedExpiryPolicy;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.CacheAtomicityMode;
+import org.apache.ignite.cache.CacheEntry;
+import org.apache.ignite.cache.CacheEntryProcessor;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.ATOMIC;
+import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
+import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC;
+
+/**
+ *
+ */
+@SuppressWarnings("unchecked")
+public class IgniteCacheNoSyncForGetTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
+
+    /** */
+    private static volatile CountDownLatch processorStartLatch;
+
+    /** */
+    private static volatile CountDownLatch hangLatch;
+
+    /** */
+    private boolean client;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+        ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(IP_FINDER);
+
+        cfg.setClientMode(client);
+
+        return cfg;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        super.beforeTestsStarted();
+
+        startGrid(0);
+
+        client = true;
+
+        startGrid(1);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+
+        super.afterTestsStopped();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testAtomicGet() throws Exception {
+        getTest(ATOMIC);
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testTxGet() throws Exception {
+        getTest(TRANSACTIONAL);
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @throws Exception If failed.
+     */
+    private void getTest(CacheAtomicityMode atomicityMode) throws Exception {
+        boolean getAll[] = {true, false};
+        boolean cfgExpiryPlc[] = {true, false};
+        boolean withExpiryPlc[] = {true, false};
+        boolean heapCache[] = {true, false};
+        for (boolean getAll0 : getAll) {
+            for (boolean expiryPlc0 : cfgExpiryPlc) {
+                for (boolean withExpiryPlc0 : withExpiryPlc) {
+                    for (boolean heapCache0 : heapCache)
+                        doGet(atomicityMode, heapCache0, getAll0, expiryPlc0, 
withExpiryPlc0);
+                }
+            }
+        }
+    }
+
+    /**
+     * @param atomicityMode Cache atomicity mode.
+     * @param heapCache Heap cache flag.
+     * @param getAll Test getAll flag.
+     * @param cfgExpiryPlc Configured expiry policy flag.
+     * @param withExpiryPlc Custom expiry policy flag.
+     * @throws Exception If failed.
+     */
+    private void doGet(CacheAtomicityMode atomicityMode,
+        boolean heapCache,
+        final boolean getAll,
+        final boolean cfgExpiryPlc,
+        final boolean withExpiryPlc) throws Exception {
+        log.info("Test get [getAll=" + getAll + ", cfgExpiryPlc=" + 
cfgExpiryPlc + ']');
+
+        Ignite srv = ignite(0);
+
+        Ignite client = ignite(1);
+
+        final IgniteCache cache = 
client.createCache(cacheConfiguration(atomicityMode, heapCache, cfgExpiryPlc));
+
+        final Map<Object, Object> data = new HashMap<>();
+
+        data.put(1, 1);
+        data.put(2, 2);
+
+        try {
+            // Get from compute closure.
+            {
+                cache.putAll(data);
+
+                hangLatch = new CountDownLatch(1);
+                processorStartLatch = new CountDownLatch(1);
+
+                IgniteInternalFuture fut = GridTestUtils.runAsync(new 
Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        if (getAll)
+                            cache.invokeAll(data.keySet(), new 
HangEntryProcessor());
+                        else
+                            cache.invoke(1, new HangEntryProcessor());
+
+                        return null;
+                    }
+                });
+
+                try {
+                    boolean wait = processorStartLatch.await(30, 
TimeUnit.SECONDS);
+
+                    assertTrue(wait);
+
+                    if (getAll) {
+                        assertEquals(data, 
client.compute().affinityCall(cache.getName(), 1,
+                            new GetAllClosure(data.keySet(), cache.getName(), 
withExpiryPlc)));
+                    }
+                    else {
+                        assertEquals(1, 
client.compute().affinityCall(cache.getName(), 1,
+                            new GetClosure(1, cache.getName(), 
withExpiryPlc)));
+                    }
+
+                    hangLatch.countDown();
+
+                    fut.get();
+                }
+                finally {
+                    hangLatch.countDown();
+                }
+            }
+
+            // Local get.
+            {
+                cache.putAll(data);
+
+                hangLatch = new CountDownLatch(1);
+                processorStartLatch = new CountDownLatch(1);
+
+                IgniteInternalFuture fut = GridTestUtils.runAsync(new 
Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        if (getAll)
+                            cache.invokeAll(data.keySet(), new 
HangEntryProcessor());
+                        else
+                            cache.invoke(1, new HangEntryProcessor());
+
+                        return null;
+                    }
+                });
+
+                try {
+                    boolean wait = processorStartLatch.await(30, 
TimeUnit.SECONDS);
+
+                    assertTrue(wait);
+
+                    IgniteCache srvCache = srv.cache(cache.getName());
+
+                    if (withExpiryPlc)
+                        srvCache = 
srvCache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create());
+
+                    if (getAll) {
+                        assertEquals(data, srvCache.getAll(data.keySet()));
+                        assertEquals(data.size(), 
srvCache.getEntries(data.keySet()).size());
+                    }
+                    else {
+                        assertEquals(1, srvCache.get(1));
+                        assertEquals(1, srvCache.getEntry(1).getValue());
+                    }
+
+                    hangLatch.countDown();
+
+                    fut.get();
+                }
+                finally {
+                    hangLatch.countDown();
+                }
+            }
+        }
+        finally {
+            client.destroyCache(cache.getName());
+        }
+    }
+
+    /**
+     * @param atomicityMode Atomicity mode.
+     * @param heapCache Heap cache flag.
+     * @param expiryPlc Expiry policy flag.
+     * @return Cache configuration.
+     */
+    private CacheConfiguration cacheConfiguration(CacheAtomicityMode 
atomicityMode,
+        boolean heapCache,
+        boolean expiryPlc) {
+        CacheConfiguration ccfg = new CacheConfiguration();
+
+        ccfg.setAtomicityMode(atomicityMode);
+        ccfg.setOnheapCacheEnabled(heapCache);
+        ccfg.setWriteSynchronizationMode(FULL_SYNC);
+        ccfg.setName("testCache");
+
+        if (expiryPlc)
+            
ccfg.setExpiryPolicyFactory(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES));
+
+        return ccfg;
+    }
+
+    /**
+     *
+     */
+    static class HangEntryProcessor implements CacheEntryProcessor {
+        /** {@inheritDoc} */
+        @Override public Object process(MutableEntry entry, Object... 
arguments) {
+            assert processorStartLatch != null;
+            assert hangLatch != null;
+
+            try {
+                processorStartLatch.countDown();
+
+                if (!hangLatch.await(60, TimeUnit.SECONDS))
+                    throw new RuntimeException("Failed to wait for latch");
+            }
+            catch (Exception e) {
+                System.out.println("Unexpected error: " + e);
+
+                throw new EntryProcessorException(e);
+            }
+
+            entry.setValue(U.currentTimeMillis());
+
+            return null;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class GetClosure implements IgniteCallable<Object> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private final int key;
+
+        /** */
+        private final String cacheName;
+
+        /** */
+        private final boolean withExpiryPlc;
+
+        /**
+         * @param key Key.
+         * @param cacheName Cache name.
+         * @param withExpiryPlc Custom expiry policy flag.
+         */
+        GetClosure(int key, String cacheName, boolean withExpiryPlc) {
+            this.key = key;
+            this.cacheName = cacheName;
+            this.withExpiryPlc = withExpiryPlc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            IgniteCache cache = ignite.cache(cacheName);
+
+            if (withExpiryPlc)
+                cache = 
cache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create());
+
+            Object val = cache.get(key);
+
+            CacheEntry e = cache.getEntry(key);
+
+            assertEquals(val, e.getValue());
+
+            return val;
+        }
+    }
+
+    /**
+     *
+     */
+    public static class GetAllClosure implements IgniteCallable<Object> {
+        /** */
+        @IgniteInstanceResource
+        private Ignite ignite;
+
+        /** */
+        private final Set<Object> keys;
+
+        /** */
+        private final String cacheName;
+
+        /** */
+        private final boolean withExpiryPlc;
+
+        /**
+         * @param keys Keys.
+         * @param cacheName Cache name.
+         * @param withExpiryPlc Custom expiry policy flag.
+         */
+        GetAllClosure(Set<Object> keys, String cacheName, boolean 
withExpiryPlc) {
+            this.keys = keys;
+            this.cacheName = cacheName;
+            this.withExpiryPlc = withExpiryPlc;
+        }
+
+        /** {@inheritDoc} */
+        @Override public Object call() throws Exception {
+            IgniteCache cache = ignite.cache(cacheName);
+
+            if (withExpiryPlc)
+                cache = 
cache.withExpiryPolicy(ModifiedExpiryPolicy.factoryOf(Duration.FIVE_MINUTES).create());
+
+            Map vals = cache.getAll(keys);
+
+            Collection<CacheEntry> entries = cache.getEntries(keys);
+
+            assertEquals(vals.size(), entries.size());
+
+            for (CacheEntry entry : entries) {
+                Object val = vals.get(entry.getKey());
+
+                assertEquals(val, entry.getValue());
+            }
+
+            return vals;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
index 5d01716..5be7e07 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/expiry/IgniteCacheExpiryPolicyAbstractTest.java
@@ -146,7 +146,8 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest 
extends IgniteCacheAbs
         assertEquals(0, pSize);
     }
 
-    /**     * @throws Exception If failed.
+    /**
+     * @throws Exception If failed.
      */
     public void testZeroOnCreate() throws Exception {
         factory = CreatedExpiryPolicy.factoryOf(Duration.ZERO);
@@ -1012,7 +1013,7 @@ public abstract class IgniteCacheExpiryPolicyAbstractTest 
extends IgniteCacheAbs
         if(cacheMode() != PARTITIONED)
             return;
 
-        factory =  CreatedExpiryPolicy.factoryOf(new 
Duration(TimeUnit.SECONDS,1));
+        factory =  CreatedExpiryPolicy.factoryOf(new 
Duration(TimeUnit.SECONDS, 2));
 
         nearCache = true;
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/52aa0ab4/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index 89e8f01..4ac3b3b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -40,6 +40,7 @@ import 
org.apache.ignite.internal.processors.cache.GridCacheVariableTopologySelf
 import 
org.apache.ignite.internal.processors.cache.IgniteAtomicCacheEntryProcessorNodeJoinTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCacheEntryProcessorNodeJoinTest;
 import org.apache.ignite.internal.processors.cache.IgniteCacheIncrementTxTest;
+import org.apache.ignite.internal.processors.cache.IgniteCacheNoSyncForGetTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteCachePartitionMapUpdateTest;
 import 
org.apache.ignite.internal.processors.cache.IgniteDynamicCacheAndNodeStop;
 import 
org.apache.ignite.internal.processors.cache.IgniteOnePhaseCommitInvokeTest;
@@ -265,6 +266,8 @@ public class IgniteCacheTestSuite2 extends TestSuite {
 
         suite.addTest(new TestSuite(IgniteOnePhaseCommitInvokeTest.class));
 
+        suite.addTest(new TestSuite(IgniteCacheNoSyncForGetTest.class));
+
         return suite;
     }
 }

Reply via email to