'Single' operations optimizations for tx cache.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/007ac2b1 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/007ac2b1 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/007ac2b1 Branch: refs/heads/ignite-single-op-get Commit: 007ac2b1fcc50ad9e610fd2a689e9f9353e1ff78 Parents: 58749e3 Author: sboikov <sboi...@gridgain.com> Authored: Tue Nov 17 20:00:41 2015 +0300 Committer: sboikov <sboi...@gridgain.com> Committed: Tue Nov 17 22:16:34 2015 +0300 ---------------------------------------------------------------------- .../processors/cache/GridCacheAdapter.java | 38 ++-- .../processors/cache/GridCacheIoManager.java | 19 +- .../distributed/dht/GridDhtCacheAdapter.java | 3 + .../dht/GridPartitionedGetFuture.java | 4 +- .../dht/GridPartitionedSingleGetFuture.java | 144 +++++++++------ .../near/GridNearSingleGetResponse.java | 17 ++ .../IgniteCacheP2pUnmarshallingErrorTest.java | 184 +++++++++++++------ 7 files changed, 259 insertions(+), 150 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java index 14b45a2..32c1a2c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java @@ -599,11 +599,11 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V } /** {@inheritDoc} */ - @Override public IgniteInternalFuture<Boolean> containsKeyAsync(K key) { + @Override public final IgniteInternalFuture<Boolean> containsKeyAsync(K key) { A.notNull(key, "key"); - return getAllAsync( - Collections.singletonList(key), + return (IgniteInternalFuture)getAsync( + key, /*force primary*/false, /*skip tx*/false, /*subj id*/null, @@ -611,15 +611,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V /*deserialize portable*/false, /*skip values*/true, /*can remap*/true - ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() { - @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException { - Map<K, V> map = fut.get(); - - assert map.isEmpty() || map.size() == 1 : map.size(); - - return map.isEmpty() ? false : map.values().iterator().next() != null; - } - }); + ); } /** {@inheritDoc} */ @@ -1484,14 +1476,14 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V * @return Future for the get operation. */ protected IgniteInternalFuture<V> getAsync( - final K key, - boolean forcePrimary, - boolean skipTx, - @Nullable UUID subjId, - String taskName, - boolean deserializePortable, - boolean skipVals, - boolean canRemap + final K key, + boolean forcePrimary, + boolean skipTx, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, + final boolean skipVals, + boolean canRemap ) { return getAllAsync(Collections.singletonList(key), forcePrimary, @@ -1505,6 +1497,12 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { Map<K, V> map = e.get(); + if (skipVals) { + Boolean val = !map.isEmpty(); + + return (V)(val); + } + assert map.isEmpty() || map.size() == 1 : map.size(); return map.get(key); http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java index ad1704d..50fc1bb 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheIoManager.java @@ -515,12 +515,12 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { GridNearSingleGetRequest req = (GridNearSingleGetRequest)msg; GridNearSingleGetResponse res = new GridNearSingleGetResponse( - ctx.cacheId(), - req.futureId(), - req.topologyVersion(), - null, - false, - req.deployInfo() != null); + ctx.cacheId(), + req.futureId(), + req.topologyVersion(), + null, + false, + req.deployInfo() != null); res.error(req.classError()); @@ -532,10 +532,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { case 116: { GridNearSingleGetResponse res = (GridNearSingleGetResponse)msg; - GridCacheFuture fut = ctx.mvcc().future(res.futureId()); - - if (fut == null) - fut = ctx.mvcc().future(res.futureId()); + GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc().future(res.futureId()); if (fut == null) { if (log.isDebugEnabled()) @@ -546,7 +543,7 @@ public class GridCacheIoManager extends GridCacheSharedManagerAdapter { res.error(res.classError()); - ((GridPartitionedSingleGetFuture)fut).onResult(nodeId, res); + fut.onResult(nodeId, res); } break; http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 96b15f3..8537357 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -767,6 +767,9 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap res0, false, req.addDeploymentInfo()); + + if (info != null && req.skipValues()) + res.setContainsValue(); } else { res = new GridNearSingleGetResponse(ctx.cacheId(), http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index 6be56e0..5a9b6c7 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -21,7 +21,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.LinkedHashMap; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; @@ -392,7 +391,8 @@ public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAda boolean remote = false; // Allow to get cached value from the local node. - boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer); + boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || + cctx.affinity().primary(cctx.localNode(), key, topVer); while (true) { GridCacheEntryEx entry; http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java index e19c9b0..4a5c71e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -188,6 +188,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im /** * @param topVer Topology version. */ + @SuppressWarnings("unchecked") private void map(AffinityTopologyVersion topVer) { ClusterNode node = mapKeyToNode(topVer); @@ -375,7 +376,14 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im colocated.removeIfObsolete(key); } else { - setResult(v, ver); + if (!skipVals) { + if (cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(true); + + setResult(v, ver); + } + else + setSkipValueResult(true, ver); return null; } @@ -405,24 +413,34 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im * @param res Result. */ public void onResult(UUID nodeId, GridNearSingleGetResponse res) { - if (!processResponse(nodeId)) + if (!processResponse(nodeId) || !checkError(res.error(), res.invalidPartitions(), res.topologyVersion())) return; Message res0 = res.result(); - CacheObject val; - GridCacheVersion ver = null; - if (needVer) { CacheVersionedValue verVal = (CacheVersionedValue)res0; - val = verVal.value(); - ver = verVal.version(); + if (verVal != null) { + if (skipVals) + setSkipValueResult(true, verVal.version()); + else + setResult(null , null); + } + else { + if (skipVals) + setSkipValueResult(false, null); + else + setResult(null , null); + } + } + else { + if (skipVals) { + setSkipValueResult(res.containsValue(), null); + } + else + setResult((CacheObject)res0, null); } - else - val = (CacheObject)res0; - - onResult(res.error(), res.invalidPartitions(), res.topologyVersion(), val, ver); } /** @@ -430,14 +448,15 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im * @param res Result. */ public void onResult(UUID nodeId, GridNearGetResponse res) { - if (!processResponse(nodeId)) + if (!processResponse(nodeId) || + !checkError(res.error(), !F.isEmpty(res.invalidPartitions()), res.topologyVersion())) return; Collection<GridCacheEntryInfo> infos = res.entries(); assert F.isEmpty(infos) || infos.size() == 1 : infos; - onResult(res.error(), !F.isEmpty(res.invalidPartitions()), res.topologyVersion(), F.first(infos)); + setResult(F.first(infos)); } /** @@ -460,37 +479,14 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im * @param err Error. * @param invalidParts Invalid partitions error flag. * @param rmtTopVer Received topology version. - * @param info Entry info. - */ - private void onResult(@Nullable IgniteCheckedException err, - boolean invalidParts, - AffinityTopologyVersion rmtTopVer, - @Nullable GridCacheEntryInfo info) { - if (info != null) { - assert skipVals == (info.value() == null); - - onResult(err, invalidParts, rmtTopVer, info.value(), info.version()); - } - else - onResult(err, invalidParts, rmtTopVer, null, null); - } - - /** - * @param err Error. - * @param invalidParts Invalid partitions error flag. - * @param rmtTopVer Received topology version. - * @param val Value. - * @param ver Version. */ - private void onResult(@Nullable IgniteCheckedException err, + private boolean checkError(@Nullable IgniteCheckedException err, boolean invalidParts, - AffinityTopologyVersion rmtTopVer, - @Nullable CacheObject val, - @Nullable GridCacheVersion ver) { + AffinityTopologyVersion rmtTopVer) { if (err != null) { onDone(err); - return; + return false; } if (invalidParts) { @@ -503,7 +499,7 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", part=" + cctx.affinity().partition(key) + ", nodeId=" + node.id() + ']')); - return; + return false; } if (canRemap) { @@ -525,22 +521,47 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im } else map(topVer); + + return false; } - else - setResult(val, ver); + + return true; } /** * @param info Entry info. */ private void setResult(@Nullable GridCacheEntryInfo info) { - if (info != null) { - assert skipVals == (info.value() == null); + assert info == null || skipVals == (info.value() == null); - setResult(info.value(), info.version()); + if (skipVals) { + if (info != null) + setSkipValueResult(true, info.version()); + else + setSkipValueResult(false, null); + } + else { + if (info != null) + setResult(info.value(), info.version()); + else + setResult(null, null); + } + } + + /** + * @param res Result. + * @param ver Version. + */ + private void setSkipValueResult(boolean res, @Nullable GridCacheVersion ver) { + assert skipVals; + + if (needVer) { + assert ver != null || !res; + + onDone(new T2<>(res, ver)); } else - onDone(skipVals ? false : null); + onDone(res); } /** @@ -549,24 +570,29 @@ public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> im */ private void setResult(@Nullable CacheObject val, @Nullable GridCacheVersion ver) { try { - if (needVer) { - assert ver != null; - assert skipVals || val != null; + assert !skipVals; - onDone(new T2<>(skipVals ? true : val, ver)); - } - else { - if (!keepCacheObjects) { - Object res = skipVals ? true : CU.value(val, cctx, true); + if (val != null) { + if (needVer) { + assert ver != null; + + onDone(new T2<>(val, ver)); + } + else { + if (!keepCacheObjects) { + Object res = CU.value(val, cctx, true); - if (deserializePortable && !skipVals) - res = cctx.unwrapPortableIfNeeded(res, false); + if (deserializePortable && !skipVals) + res = cctx.unwrapPortableIfNeeded(res, false); - onDone(res); + onDone(res); + } + else + onDone(val); } - else - onDone(skipVals ? true : val); } + else + onDone(null); } catch (Exception e) { onDone(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java index eedf1e6..66b4d9b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetResponse.java @@ -44,6 +44,9 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC /** */ public static final int INVALID_PART_FLAG_MASK = 0x1; + /** */ + public static final int CONTAINS_VAL_FLAG_MASK = 0x2; + /** Future ID. */ private IgniteUuid futId; @@ -125,6 +128,20 @@ public class GridNearSingleGetResponse extends GridCacheMessage implements GridC } /** + * @return Results for request with set flag {@link GridNearSingleGetRequest#skipValues()}. + */ + public boolean containsValue() { + return (flags & CONTAINS_VAL_FLAG_MASK) != 0; + } + + /** + * + */ + public void setContainsValue() { + flags = (byte)(flags | CONTAINS_VAL_FLAG_MASK); + } + + /** * @return Result. */ public Message result() { http://git-wip-us.apache.org/repos/asf/ignite/blob/007ac2b1/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java index f4423f7..9c1abc7 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheP2pUnmarshallingErrorTest.java @@ -21,6 +21,7 @@ import java.io.Externalizable; import java.io.IOException; import java.io.ObjectInput; import java.io.ObjectOutput; +import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; import javax.cache.CacheException; import org.apache.ignite.cache.CacheAtomicWriteOrderMode; @@ -30,6 +31,7 @@ import org.apache.ignite.cache.query.annotations.QuerySqlField; import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.NearCacheConfiguration; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; /** @@ -37,7 +39,10 @@ import org.apache.ignite.internal.util.typedef.X; */ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTest { /** Allows to change behavior of readExternal method. */ - protected static AtomicInteger readCnt = new AtomicInteger(); + protected static final AtomicInteger readCnt = new AtomicInteger(); + + /** Allows to change behavior of readExternal method. */ + protected static final AtomicInteger valReadCnt = new AtomicInteger(); /** Iterable key. */ protected static int key = 0; @@ -86,71 +91,40 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes return cfg; } - /** Test key 1. */ - protected static class TestKey implements Externalizable { - /** Field. */ - @QuerySqlField(index = true) - private String field; - - /** - * @param field Test key 1. - */ - public TestKey(String field) { - this.field = field; - } - - /** Test key 1. */ - public TestKey() { - // No-op. - } - - /** {@inheritDoc} */ - @Override public boolean equals(Object o) { - if (this == o) - return true; - if (o == null || getClass() != o.getClass()) - return false; - - TestKey key = (TestKey)o; - - return !(field != null ? !field.equals(key.field) : key.field != null); - } + /** + * Sends put atomically and handles fail. + * + * @param k Key. + */ + protected void failAtomicPut(int k) { + try { + jcache(0).put(new TestKey(String.valueOf(k)), ""); - /** {@inheritDoc} */ - @Override public int hashCode() { - return field != null ? field.hashCode() : 0; + assert false : "p2p marshalling failed, but error response was not sent"; } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(field); + catch (CacheException e) { + assert X.hasCause(e, IOException.class); } - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - field = (String)in.readObject(); - - if (readCnt.decrementAndGet() <= 0) - throw new IOException("Class can not be unmarshalled."); - } + assert readCnt.get() == 0; //ensure we have read count as expected. } /** - * Sends put atomically and handles fail. + * Sends get atomically and handles fail. * * @param k Key. */ - protected void failAtomicPut(int k) { + protected void failGetAll(int k) { try { - jcache(0).put(new TestKey(String.valueOf(k)), ""); + Set<Object> keys = F.<Object>asSet(new TestKey(String.valueOf(k))); + + jcache(0).getAll(keys); assert false : "p2p marshalling failed, but error response was not sent"; } catch (CacheException e) { assert X.hasCause(e, IOException.class); } - - assert readCnt.get() == 0; //ensure we have read count as expected. } /** @@ -158,7 +132,7 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes * * @param k Key. */ - protected void failAtomicGet(int k) { + protected void failGet(int k) { try { jcache(0).get(new TestKey(String.valueOf(k))); @@ -175,38 +149,132 @@ public class IgniteCacheP2pUnmarshallingErrorTest extends IgniteCacheAbstractTes * @throws Exception If failed. */ public void testResponseMessageOnUnmarshallingFailed() throws Exception { - //GridNearAtomicUpdateRequest unmarshalling failed test + // GridNearAtomicUpdateRequest unmarshalling failed test. readCnt.set(1); failAtomicPut(++key); - //Check that cache is empty. + // Check that cache is empty. readCnt.set(Integer.MAX_VALUE); assert jcache(0).get(new TestKey(String.valueOf(key))) == null; - //GridDhtAtomicUpdateRequest unmarshalling failed test + // GridDhtAtomicUpdateRequest unmarshalling failed test. readCnt.set(2); failAtomicPut(++key); - //Check that cache is not empty. + // Check that cache is not empty. readCnt.set(Integer.MAX_VALUE); assert jcache(0).get(new TestKey(String.valueOf(key))) != null; - //GridNearGetRequest unmarshalling failed test + // GridNearGetRequest unmarshalling failed test. readCnt.set(1); - failAtomicGet(++key); + failGetAll(++key); - //GridNearGetResponse unmarshalling failed test + // GridNearGetResponse unmarshalling failed test. readCnt.set(Integer.MAX_VALUE); jcache(0).put(new TestKey(String.valueOf(++key)), ""); readCnt.set(2); - failAtomicGet(key); + failGetAll(key); + + readCnt.set(Integer.MAX_VALUE); + valReadCnt.set(Integer.MAX_VALUE); + + jcache(0).put(new TestKey(String.valueOf(++key)), new TestValue()); + + assertNotNull(new TestKey(String.valueOf(key))); + + // GridNearSingleGetRequest unmarshalling failed. + readCnt.set(1); + + failGet(key); + + // GridNearSingleGetRequest unmarshalling failed. + valReadCnt.set(1); + readCnt.set(2); + + failGet(key); + } + + /** + * Test key. + */ + protected static class TestKey implements Externalizable { + /** Field. */ + @QuerySqlField(index = true) + private String field; + + /** + * Required by {@link Externalizable}. + */ + public TestKey() { + // No-op. + } + + /** + * @param field Test key 1. + */ + public TestKey(String field) { + this.field = field; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + TestKey key = (TestKey)o; + + return !(field != null ? !field.equals(key.field) : key.field != null); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return field != null ? field.hashCode() : 0; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(field); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + field = (String)in.readObject(); + + if (readCnt.decrementAndGet() <= 0) + throw new IOException("Class can not be unmarshalled."); + } + } + + /** + * Test value. + */ + protected static class TestValue implements Externalizable { + /** + * Required by {@link Externalizable}. + */ + public TestValue() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + if (valReadCnt.decrementAndGet() <= 0) + throw new IOException("Class can not be unmarshalled."); + } } } \ No newline at end of file