'Single' operations optimizations for tx cache.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/007ac2b1
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/007ac2b1
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/007ac2b1

Branch: refs/heads/ignite-single-op-get
Commit: 007ac2b1fcc50ad9e610fd2a689e9f9353e1ff78
Parents: 58749e3
Author: sboikov <sboi...@gridgain.com>
Authored: Tue Nov 17 20:00:41 2015 +0300
Committer: sboikov <sboi...@gridgain.com>
Committed: Tue Nov 17 22:16:34 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      |  38 ++--
 .../processors/cache/GridCacheIoManager.java    |  19 +-
 .../distributed/dht/GridDhtCacheAdapter.java    |   3 +
 .../dht/GridPartitionedGetFuture.java           |   4 +-
 .../dht/GridPartitionedSingleGetFuture.java     | 144 +++++++++------
 .../near/GridNearSingleGetResponse.java         |  17 ++
 .../IgniteCacheP2pUnmarshallingErrorTest.java   | 184 +++++++++++++------
 7 files changed, 259 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 14b45a2..32c1a2c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -599,11 +599,11 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteInternalFuture<Boolean> containsKeyAsync(K key) {
+    @Override public final IgniteInternalFuture<Boolean> containsKeyAsync(K 
key) {
         A.notNull(key, "key");
 
-        return getAllAsync(
-            Collections.singletonList(key),
+        return (IgniteInternalFuture)getAsync(
+            key,
             /*force primary*/false,
             /*skip tx*/false,
             /*subj id*/null,
@@ -611,15 +611,7 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
             /*deserialize portable*/false,
             /*skip values*/true,
             /*can remap*/true
-        ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
-            @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> 
fut) throws IgniteCheckedException {
-                Map<K, V> map = fut.get();
-
-                assert map.isEmpty() || map.size() == 1 : map.size();
-
-                return map.isEmpty() ? false : map.values().iterator().next() 
!= null;
-            }
-        });
+        );
     }
 
     /** {@inheritDoc} */
@@ -1484,14 +1476,14 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
      * @return Future for the get operation.
      */
     protected IgniteInternalFuture<V> getAsync(
-        final K key,
-        boolean forcePrimary,
-        boolean skipTx,
-        @Nullable UUID subjId,
-        String taskName,
-        boolean deserializePortable,
-        boolean skipVals,
-        boolean canRemap
+            final K key,
+            boolean forcePrimary,
+            boolean skipTx,
+            @Nullable UUID subjId,
+            String taskName,
+            boolean deserializePortable,
+            final boolean skipVals,
+            boolean canRemap
     ) {
         return getAllAsync(Collections.singletonList(key),
             forcePrimary,
@@ -1505,6 +1497,12 @@ public abstract class GridCacheAdapter<K, V> implements 
IgniteInternalCache<K, V
                 @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) 
throws IgniteCheckedException {
                     Map<K, V> map = e.get();
 
+                    if (skipVals) {
+                        Boolean val = !map.isEmpty();
+
+                        return (V)(val);
+                    }
+
                     assert map.isEmpty() || map.size() == 1 : map.size();
 
                     return map.get(key);

http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
index ad1704d..50fc1bb 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java
@@ -515,12 +515,12 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
                 GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg;
 
                 GridNearSingleGetResponse res = new GridNearSingleGetResponse(
-                        ctx.cacheId(),
-                        req.futureId(),
-                        req.topologyVersion(),
-                        null,
-                        false,
-                        req.deployInfo() != null);
+                    ctx.cacheId(),
+                    req.futureId(),
+                    req.topologyVersion(),
+                    null,
+                    false,
+                    req.deployInfo() != null);
 
                 res.error(req.classError());
 
@@ -532,10 +532,7 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
             case 116: {
                 GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg;
 
-                GridCacheFuture fut = ctx.mvcc().future(res.futureId());
-
-                if (fut == null)
-                    fut = ctx.mvcc().future(res.futureId());
+                GridPartitionedSingleGetFuture fut = 
(GridPartitionedSingleGetFuture)ctx.mvcc().future(res.futureId());
 
                 if (fut == null) {
                     if (log.isDebugEnabled())
@@ -546,7 +543,7 @@ public class GridCacheIoManager extends 
GridCacheSharedManagerAdapter {
 
                 res.error(res.classError());
 
-                ((GridPartitionedSingleGetFuture)fut).onResult(nodeId, res);
+                fut.onResult(nodeId, res);
             }
 
             break;

http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index 96b15f3..8537357 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -767,6 +767,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends 
GridDistributedCacheAdap
                             res0,
                             false,
                             req.addDeploymentInfo());
+
+                        if (info != null && req.skipValues())
+                            res.setContainsValue();
                     }
                     else {
                         res = new GridNearSingleGetResponse(ctx.cacheId(),

http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 6be56e0..5a9b6c7 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -21,7 +21,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
@@ -392,7 +391,8 @@ public class GridPartitionedGetFuture<K, V> extends 
CacheDistributedGetFutureAda
         boolean remote = false;
 
         // Allow to get cached value from the local node.
-        boolean allowLocRead = !forcePrimary || 
cctx.affinity().primary(cctx.localNode(), key, topVer);
+        boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) ||
+                cctx.affinity().primary(cctx.localNode(), key, topVer);
 
         while (true) {
             GridCacheEntryEx entry;

http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
index e19c9b0..4a5c71e 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java
@@ -188,6 +188,7 @@ public class GridPartitionedSingleGetFuture extends 
GridFutureAdapter<Object> im
     /**
      * @param topVer Topology version.
      */
+    @SuppressWarnings("unchecked")
     private void map(AffinityTopologyVersion topVer) {
         ClusterNode node = mapKeyToNode(topVer);
 
@@ -375,7 +376,14 @@ public class GridPartitionedSingleGetFuture extends 
GridFutureAdapter<Object> im
                                 colocated.removeIfObsolete(key);
                         }
                         else {
-                            setResult(v, ver);
+                            if (!skipVals) {
+                                if (cctx.config().isStatisticsEnabled())
+                                    cctx.cache().metrics0().onRead(true);
+
+                                setResult(v, ver);
+                            }
+                            else
+                                setSkipValueResult(true, ver);
 
                             return null;
                         }
@@ -405,24 +413,34 @@ public class GridPartitionedSingleGetFuture extends 
GridFutureAdapter<Object> im
      * @param res Result.
      */
     public void onResult(UUID nodeId, GridNearSingleGetResponse res) {
-        if (!processResponse(nodeId))
+        if (!processResponse(nodeId) || !checkError(res.error(), 
res.invalidPartitions(), res.topologyVersion()))
             return;
 
         Message res0 = res.result();
 
-        CacheObject val;
-        GridCacheVersion ver = null;
-
         if (needVer) {
             CacheVersionedValue verVal = (CacheVersionedValue)res0;
 
-            val = verVal.value();
-            ver = verVal.version();
+            if (verVal != null) {
+                if (skipVals)
+                    setSkipValueResult(true, verVal.version());
+                else
+                    setResult(null , null);
+            }
+            else {
+                if (skipVals)
+                    setSkipValueResult(false, null);
+                else
+                    setResult(null , null);
+            }
+        }
+        else {
+            if (skipVals) {
+                setSkipValueResult(res.containsValue(), null);
+            }
+            else
+                setResult((CacheObject)res0, null);
         }
-        else
-            val = (CacheObject)res0;
-
-        onResult(res.error(), res.invalidPartitions(), res.topologyVersion(), 
val, ver);
     }
 
     /**
@@ -430,14 +448,15 @@ public class GridPartitionedSingleGetFuture extends 
GridFutureAdapter<Object> im
      * @param res Result.
      */
     public void onResult(UUID nodeId, GridNearGetResponse res) {
-        if (!processResponse(nodeId))
+        if (!processResponse(nodeId) ||
+            !checkError(res.error(), !F.isEmpty(res.invalidPartitions()), 
res.topologyVersion()))
             return;
 
         Collection<GridCacheEntryInfo> infos = res.entries();
 
         assert F.isEmpty(infos) || infos.size() == 1 : infos;
 
-        onResult(res.error(), !F.isEmpty(res.invalidPartitions()), 
res.topologyVersion(), F.first(infos));
+        setResult(F.first(infos));
     }
 
     /**
@@ -460,37 +479,14 @@ public class GridPartitionedSingleGetFuture extends 
GridFutureAdapter<Object> im
      * @param err Error.
      * @param invalidParts Invalid partitions error flag.
      * @param rmtTopVer Received topology version.
-     * @param info Entry info.
-     */
-    private void onResult(@Nullable IgniteCheckedException err,
-        boolean invalidParts,
-        AffinityTopologyVersion rmtTopVer,
-        @Nullable GridCacheEntryInfo info) {
-        if (info != null) {
-            assert skipVals == (info.value() == null);
-
-            onResult(err, invalidParts, rmtTopVer, info.value(), 
info.version());
-        }
-        else
-            onResult(err, invalidParts, rmtTopVer, null, null);
-    }
-
-    /**
-     * @param err Error.
-     * @param invalidParts Invalid partitions error flag.
-     * @param rmtTopVer Received topology version.
-     * @param val Value.
-     * @param ver Version.
      */
-    private void onResult(@Nullable IgniteCheckedException err,
+    private boolean checkError(@Nullable IgniteCheckedException err,
         boolean invalidParts,
-        AffinityTopologyVersion rmtTopVer,
-        @Nullable CacheObject val,
-        @Nullable GridCacheVersion ver) {
+        AffinityTopologyVersion rmtTopVer) {
         if (err != null) {
             onDone(err);
 
-            return;
+            return false;
         }
 
         if (invalidParts) {
@@ -503,7 +499,7 @@ public class GridPartitionedSingleGetFuture extends 
GridFutureAdapter<Object> im
                     "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", 
part=" + cctx.affinity().partition(key) +
                     ", nodeId=" + node.id() + ']'));
 
-                return;
+                return false;
             }
 
             if (canRemap) {
@@ -525,22 +521,47 @@ public class GridPartitionedSingleGetFuture extends 
GridFutureAdapter<Object> im
             }
             else
                 map(topVer);
+
+            return false;
         }
-        else
-            setResult(val, ver);
+
+        return true;
     }
 
     /**
      * @param info Entry info.
      */
     private void setResult(@Nullable GridCacheEntryInfo info) {
-        if (info != null) {
-            assert skipVals == (info.value() == null);
+        assert info == null || skipVals == (info.value() == null);
 
-            setResult(info.value(), info.version());
+        if (skipVals) {
+            if (info != null)
+                setSkipValueResult(true, info.version());
+            else
+                setSkipValueResult(false, null);
+        }
+        else {
+            if (info != null)
+                setResult(info.value(), info.version());
+            else
+                setResult(null, null);
+        }
+    }
+
+    /**
+     * @param res Result.
+     * @param ver Version.
+     */
+    private void setSkipValueResult(boolean res, @Nullable GridCacheVersion 
ver) {
+        assert skipVals;
+
+        if (needVer) {
+            assert ver != null || !res;
+
+            onDone(new T2<>(res, ver));
         }
         else
-            onDone(skipVals ? false : null);
+            onDone(res);
     }
 
     /**
@@ -549,24 +570,29 @@ public class GridPartitionedSingleGetFuture extends 
GridFutureAdapter<Object> im
      */
     private void setResult(@Nullable CacheObject val, @Nullable 
GridCacheVersion ver) {
         try {
-            if (needVer) {
-                assert ver != null;
-                assert skipVals || val != null;
+            assert !skipVals;
 
-                onDone(new T2<>(skipVals ? true : val, ver));
-            }
-            else {
-                if (!keepCacheObjects) {
-                    Object res = skipVals ? true : CU.value(val, cctx, true);
+            if (val != null) {
+                if (needVer) {
+                    assert ver != null;
+
+                    onDone(new T2<>(val, ver));
+                }
+                else {
+                    if (!keepCacheObjects) {
+                        Object res = CU.value(val, cctx, true);
 
-                    if (deserializePortable && !skipVals)
-                        res = cctx.unwrapPortableIfNeeded(res, false);
+                        if (deserializePortable && !skipVals)
+                            res = cctx.unwrapPortableIfNeeded(res, false);
 
-                    onDone(res);
+                        onDone(res);
+                    }
+                    else
+                        onDone(val);
                 }
-                else
-                    onDone(skipVals ? true : val);
             }
+            else
+                onDone(null);
         }
         catch (Exception e) {
             onDone(e);

http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
index eedf1e6..66b4d9b 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java
@@ -44,6 +44,9 @@ public class GridNearSingleGetResponse extends 
GridCacheMessage implements GridC
     /** */
     public static final int INVALID_PART_FLAG_MASK = 0x1;
 
+    /** */
+    public static final int CONTAINS_VAL_FLAG_MASK = 0x2;
+
     /** Future ID. */
     private IgniteUuid futId;
 
@@ -125,6 +128,20 @@ public class GridNearSingleGetResponse extends 
GridCacheMessage implements GridC
     }
 
     /**
+     * @return Results for request with set flag {@link 
GridNearSingleGetRequest#skipValues()}.
+     */
+    public boolean containsValue() {
+        return (flags & CONTAINS_VAL_FLAG_MASK) != 0;
+    }
+
+    /**
+     *
+     */
+    public void setContainsValue() {
+        flags = (byte)(flags | CONTAINS_VAL_FLAG_MASK);
+    }
+
+    /**
      * @return Result.
      */
     public Message result() {

http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
index f4423f7..9c1abc7 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java
@@ -21,6 +21,7 @@ import java.io.Externalizable;
 import java.io.IOException;
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.cache.CacheException;
 import org.apache.ignite.cache.CacheAtomicWriteOrderMode;
@@ -30,6 +31,7 @@ import 
org.apache.ignite.cache.query.annotations.QuerySqlField;
 import org.apache.ignite.configuration.CacheConfiguration;
 import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.X;
 
 /**
@@ -37,7 +39,10 @@ import org.apache.ignite.internal.util.typedef.X;
  */
 public class IgniteCacheP2pUnmarshallingErrorTest extends 
IgniteCacheAbstractTest {
     /** Allows to change behavior of readExternal method. */
-    protected static AtomicInteger readCnt = new AtomicInteger();
+    protected static final AtomicInteger readCnt = new AtomicInteger();
+
+    /** Allows to change behavior of readExternal method. */
+    protected static final AtomicInteger valReadCnt = new AtomicInteger();
 
     /** Iterable key. */
     protected static int key = 0;
@@ -86,71 +91,40 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends 
IgniteCacheAbstractTes
         return cfg;
     }
 
-    /** Test key 1. */
-    protected static class TestKey implements Externalizable {
-        /** Field. */
-        @QuerySqlField(index = true)
-        private String field;
-
-        /**
-         * @param field Test key 1.
-         */
-        public TestKey(String field) {
-            this.field = field;
-        }
-
-        /** Test key 1. */
-        public TestKey() {
-            // No-op.
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean equals(Object o) {
-            if (this == o)
-                return true;
-            if (o == null || getClass() != o.getClass())
-                return false;
-
-            TestKey key = (TestKey)o;
-
-            return !(field != null ? !field.equals(key.field) : key.field != 
null);
-        }
+    /**
+     * Sends put atomically and handles fail.
+     *
+     * @param k Key.
+     */
+    protected void failAtomicPut(int k) {
+        try {
+            jcache(0).put(new TestKey(String.valueOf(k)), "");
 
-        /** {@inheritDoc} */
-        @Override public int hashCode() {
-            return field != null ? field.hashCode() : 0;
+            assert false : "p2p marshalling failed, but error response was not 
sent";
         }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
-            out.writeObject(field);
+        catch (CacheException e) {
+            assert X.hasCause(e, IOException.class);
         }
 
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-            field = (String)in.readObject();
-
-            if (readCnt.decrementAndGet() <= 0)
-                throw new IOException("Class can not be unmarshalled.");
-        }
+        assert readCnt.get() == 0; //ensure we have read count as expected.
     }
 
     /**
-     * Sends put atomically and handles fail.
+     * Sends get atomically and handles fail.
      *
      * @param k Key.
      */
-    protected void failAtomicPut(int k) {
+    protected void failGetAll(int k) {
         try {
-            jcache(0).put(new TestKey(String.valueOf(k)), "");
+            Set<Object> keys = F.<Object>asSet(new TestKey(String.valueOf(k)));
+
+            jcache(0).getAll(keys);
 
             assert false : "p2p marshalling failed, but error response was not 
sent";
         }
         catch (CacheException e) {
             assert X.hasCause(e, IOException.class);
         }
-
-        assert readCnt.get() == 0; //ensure we have read count as expected.
     }
 
     /**
@@ -158,7 +132,7 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends 
IgniteCacheAbstractTes
      *
      * @param k Key.
      */
-    protected void failAtomicGet(int k) {
+    protected void failGet(int k) {
         try {
             jcache(0).get(new TestKey(String.valueOf(k)));
 
@@ -175,38 +149,132 @@ public class IgniteCacheP2pUnmarshallingErrorTest 
extends IgniteCacheAbstractTes
      * @throws Exception If failed.
      */
     public void testResponseMessageOnUnmarshallingFailed() throws Exception {
-        //GridNearAtomicUpdateRequest unmarshalling failed test
+        // GridNearAtomicUpdateRequest unmarshalling failed test.
         readCnt.set(1);
 
         failAtomicPut(++key);
 
-        //Check that cache is empty.
+        // Check that cache is empty.
         readCnt.set(Integer.MAX_VALUE);
 
         assert jcache(0).get(new TestKey(String.valueOf(key))) == null;
 
-        //GridDhtAtomicUpdateRequest unmarshalling failed test
+        // GridDhtAtomicUpdateRequest unmarshalling failed test.
         readCnt.set(2);
 
         failAtomicPut(++key);
 
-        //Check that cache is not empty.
+        // Check that cache is not empty.
         readCnt.set(Integer.MAX_VALUE);
 
         assert jcache(0).get(new TestKey(String.valueOf(key))) != null;
 
-        //GridNearGetRequest unmarshalling failed test
+        // GridNearGetRequest unmarshalling failed test.
         readCnt.set(1);
 
-        failAtomicGet(++key);
+        failGetAll(++key);
 
-        //GridNearGetResponse unmarshalling failed test
+        // GridNearGetResponse unmarshalling failed test.
         readCnt.set(Integer.MAX_VALUE);
 
         jcache(0).put(new TestKey(String.valueOf(++key)), "");
 
         readCnt.set(2);
 
-        failAtomicGet(key);
+        failGetAll(key);
+
+        readCnt.set(Integer.MAX_VALUE);
+        valReadCnt.set(Integer.MAX_VALUE);
+
+        jcache(0).put(new TestKey(String.valueOf(++key)), new TestValue());
+
+        assertNotNull(new TestKey(String.valueOf(key)));
+
+        // GridNearSingleGetRequest unmarshalling failed.
+        readCnt.set(1);
+
+        failGet(key);
+
+        // GridNearSingleGetRequest unmarshalling failed.
+        valReadCnt.set(1);
+        readCnt.set(2);
+
+        failGet(key);
+    }
+
+    /**
+     * Test key.
+     */
+    protected static class TestKey implements Externalizable {
+        /** Field. */
+        @QuerySqlField(index = true)
+        private String field;
+
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public TestKey() {
+            // No-op.
+        }
+
+        /**
+         * @param field Test key 1.
+         */
+        public TestKey(String field) {
+            this.field = field;
+        }
+
+        /** {@inheritDoc} */
+        @Override public boolean equals(Object o) {
+            if (this == o)
+                return true;
+            if (o == null || getClass() != o.getClass())
+                return false;
+
+            TestKey key = (TestKey)o;
+
+            return !(field != null ? !field.equals(key.field) : key.field != 
null);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int hashCode() {
+            return field != null ? field.hashCode() : 0;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            out.writeObject(field);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            field = (String)in.readObject();
+
+            if (readCnt.decrementAndGet() <= 0)
+                throw new IOException("Class can not be unmarshalled.");
+        }
+    }
+
+    /**
+     * Test value.
+     */
+    protected static class TestValue implements Externalizable {
+        /**
+         * Required by {@link Externalizable}.
+         */
+        public TestValue() {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writeExternal(ObjectOutput out) throws 
IOException {
+            // No-op.
+        }
+
+        /** {@inheritDoc} */
+        @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
+            if (valReadCnt.decrementAndGet() <= 0)
+                throw new IOException("Class can not be unmarshalled.");
+        }
     }
 }
\ No newline at end of file

Reply via email to