http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java new file mode 100644 index 0000000..8f2357b --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedSingleGetFuture.java @@ -0,0 +1,697 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; +import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; +import org.apache.ignite.internal.processors.cache.GridCacheFuture; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.CIX1; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteProductVersion; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +/** + * + */ +public class GridPartitionedSingleGetFuture extends GridFutureAdapter<Object> implements GridCacheFuture<Object>, + CacheGetFuture { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + public static final IgniteProductVersion SINGLE_GET_MSG_SINCE = IgniteProductVersion.fromString("1.5.0"); + + /** Logger reference. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** Logger. */ + private static IgniteLogger log; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + + /** Context. */ + private final GridCacheContext cctx; + + /** Key. */ + private final KeyCacheObject key; + + /** Read through flag. */ + private final boolean readThrough; + + /** Force primary flag. */ + private final boolean forcePrimary; + + /** Future ID. */ + private final IgniteUuid futId; + + /** Trackable flag. */ + private boolean trackable; + + /** Subject ID. */ + private final UUID subjId; + + /** Task name. */ + private final String taskName; + + /** Whether to deserialize portable objects. */ + private boolean deserializePortable; + + /** Skip values flag. */ + private boolean skipVals; + + /** Expiry policy. */ + private IgniteCacheExpiryPolicy expiryPlc; + + /** Flag indicating that get should be done on a locked topology version. */ + private final boolean canRemap; + + /** */ + private final boolean needVer; + + /** */ + private final boolean keepCacheObjects; + + /** */ + private ClusterNode node; + + /** + * @param cctx Context. + * @param key Key. + * @param topVer Topology version. + * @param readThrough Read through flag. + * @param forcePrimary If {@code true} then will force network trip to primary node even if called on backup node. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. + * @param skipVals Skip values flag. + * @param canRemap Flag indicating whether future can be remapped on a newer topology version. + * @param needVer If {@code true} returns values as tuples containing value and version. + * @param keepCacheObjects Keep cache objects flag. + */ + public GridPartitionedSingleGetFuture( + GridCacheContext cctx, + KeyCacheObject key, + AffinityTopologyVersion topVer, + boolean readThrough, + boolean forcePrimary, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, + @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean skipVals, + boolean canRemap, + boolean needVer, + boolean keepCacheObjects + ) { + assert key != null; + + this.cctx = cctx; + this.key = key; + this.readThrough = readThrough; + this.forcePrimary = forcePrimary; + this.subjId = subjId; + this.taskName = taskName; + this.deserializePortable = deserializePortable; + this.expiryPlc = expiryPlc; + this.skipVals = skipVals; + this.canRemap = canRemap; + this.needVer = needVer; + this.keepCacheObjects = keepCacheObjects; + this.topVer = topVer; + + futId = IgniteUuid.randomUuid(); + + if (log == null) + log = U.logger(cctx.kernalContext(), logRef, GridPartitionedSingleGetFuture.class); + } + + /** + * + */ + public void init() { + AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : + canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion(); + + map(topVer); + } + + /** + * @param topVer Topology version. + */ + @SuppressWarnings("unchecked") + private void map(AffinityTopologyVersion topVer) { + this.topVer = topVer; + + ClusterNode node = mapKeyToNode(topVer); + + if (node == null) { + assert isDone() : this; + + return; + } + + if (node.isLocal()) { + LinkedHashMap<KeyCacheObject, Boolean> map = U.newLinkedHashMap(1); + + map.put(key, false); + + final GridDhtFuture<Collection<GridCacheEntryInfo>> fut = cctx.dht().getDhtAsync(node.id(), + -1, + map, + readThrough, + topVer, + subjId, + taskName == null ? 0 : taskName.hashCode(), + expiryPlc, + skipVals); + + final Collection<Integer> invalidParts = fut.invalidPartitions(); + + if (!F.isEmpty(invalidParts)) { + AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx(); + + assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology " + + "version did not change [topVer=" + topVer + ", updTopVer=" + updTopVer + + ", invalidParts=" + invalidParts + ']'; + + // Remap recursively. + map(updTopVer); + } + else { + fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() { + @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut) { + try { + Collection<GridCacheEntryInfo> infos = fut.get(); + + assert F.isEmpty(infos) || infos.size() == 1 : infos; + + setResult(F.first(infos)); + } + catch (Exception e) { + U.error(log, "Failed to get values from dht cache [fut=" + fut + "]", e); + + onDone(e); + } + } + }); + } + } + else { + synchronized (this) { + this.node = node; + } + + if (!trackable) { + trackable = true; + + cctx.mvcc().addFuture(this, futId); + } + + GridCacheMessage req; + + if (node.version().compareTo(SINGLE_GET_MSG_SINCE) >= 0) { + req = new GridNearSingleGetRequest(cctx.cacheId(), + futId, + key, + readThrough, + topVer, + subjId, + taskName == null ? 0 : taskName.hashCode(), + expiryPlc != null ? expiryPlc.forAccess() : -1L, + skipVals, + /**add reader*/false, + needVer, + cctx.deploymentEnabled()); + } + else { + LinkedHashMap<KeyCacheObject, Boolean> map = U.newLinkedHashMap(1); + + map.put(key, false); + + req = new GridNearGetRequest( + cctx.cacheId(), + futId, + futId, + cctx.versions().next(), + map, + readThrough, + topVer, + subjId, + taskName == null ? 0 : taskName.hashCode(), + expiryPlc != null ? expiryPlc.forAccess() : -1L, + skipVals, + cctx.deploymentEnabled()); + } + + try { + cctx.io().send(node, req, cctx.ioPolicy()); + } + catch (IgniteCheckedException e) { + if (e instanceof ClusterTopologyCheckedException) + onNodeLeft(node.id()); + else + onDone(e); + } + } + } + + /** + * @param topVer Topology version. + * @return Primary node or {@code null} if future was completed. + */ + @Nullable private ClusterNode mapKeyToNode(AffinityTopologyVersion topVer) { + ClusterNode primary = affinityNode(key, topVer); + + if (primary == null) { + onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + + "(all partition nodes left the grid) [topVer=" + topVer + ", cache=" + cctx.name() + ']')); + + return null; + } + + boolean allowLocRead = (cctx.affinityNode() && !forcePrimary) || primary.isLocal(); + + if (allowLocRead) { + GridDhtCacheAdapter colocated = cctx.dht(); + + while (true) { + GridCacheEntryEx entry; + + try { + entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) : + colocated.peekEx(key); + + // If our DHT cache do has value, then we peek it. + if (entry != null) { + boolean isNew = entry.isNewLocked(); + + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + subjId, + null, + taskName, + expiryPlc); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = entry.innerGet(null, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc); + } + + colocated.context().evicts().touch(entry, topVer); + + // Entry was not in memory or in swap, so we remove it from cache. + if (v == null) { + if (isNew && entry.markObsoleteIfEmpty(ver)) + colocated.removeIfObsolete(key); + } + else { + if (!skipVals && cctx.config().isStatisticsEnabled()) + cctx.cache().metrics0().onRead(true); + + if (!skipVals) + setResult(v, ver); + else + setSkipValueResult(true, ver); + + return null; + } + } + + break; + } + catch (GridDhtInvalidPartitionException ignored) { + break; + } + catch (IgniteCheckedException e) { + onDone(e); + + return null; + } + catch (GridCacheEntryRemovedException ignored) { + // No-op, will retry. + } + } + } + + return primary; + } + + /** + * @param nodeId Node ID. + * @param res Result. + */ + public void onResult(UUID nodeId, GridNearSingleGetResponse res) { + if (!processResponse(nodeId) || !checkError(res.error(), res.invalidPartitions(), res.topologyVersion())) + return; + + Message res0 = res.result(); + + if (needVer) { + CacheVersionedValue verVal = (CacheVersionedValue)res0; + + if (verVal != null) { + if (skipVals) + setSkipValueResult(true, verVal.version()); + else + setResult(verVal.value() , verVal.version()); + } + else { + if (skipVals) + setSkipValueResult(false, null); + else + setResult(null , null); + } + } + else { + if (skipVals) + setSkipValueResult(res.containsValue(), null); + else + setResult((CacheObject)res0, null); + } + } + + /** + * @param nodeId Node ID. + * @param res Result. + */ + public void onResult(UUID nodeId, GridNearGetResponse res) { + if (!processResponse(nodeId) || + !checkError(res.error(), !F.isEmpty(res.invalidPartitions()), res.topologyVersion())) + return; + + Collection<GridCacheEntryInfo> infos = res.entries(); + + assert F.isEmpty(infos) || infos.size() == 1 : infos; + + setResult(F.first(infos)); + } + + /** + * @param nodeId Node ID. + * @return {@code True} if should process received response. + */ + private boolean processResponse(UUID nodeId) { + synchronized (this) { + if (node != null && node.id().equals(nodeId)) { + node = null; + + return true; + } + } + + return false; + } + + /** + * @param err Error. + * @param invalidParts Invalid partitions error flag. + * @param rmtTopVer Received topology version. + */ + private boolean checkError(@Nullable IgniteCheckedException err, + boolean invalidParts, + AffinityTopologyVersion rmtTopVer) { + if (err != null) { + onDone(err); + + return false; + } + + if (invalidParts) { + assert !rmtTopVer.equals(AffinityTopologyVersion.ZERO); + + if (rmtTopVer.compareTo(topVer) <= 0) { + // Fail the whole get future. + onDone(new IgniteCheckedException("Failed to process invalid partitions response (remote node reported " + + "invalid partitions but remote topology version does not differ from local) " + + "[topVer=" + topVer + ", rmtTopVer=" + rmtTopVer + ", part=" + cctx.affinity().partition(key) + + ", nodeId=" + node.id() + ']')); + + return false; + } + + if (canRemap) { + IgniteInternalFuture<Long> topFut = cctx.discovery().topologyFuture(rmtTopVer.topologyVersion()); + + topFut.listen(new CIX1<IgniteInternalFuture<Long>>() { + @Override public void applyx(IgniteInternalFuture<Long> fut) { + try { + AffinityTopologyVersion topVer = new AffinityTopologyVersion(fut.get()); + + remap(topVer); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + + } + else + map(topVer); + + return false; + } + + return true; + } + + /** + * @param info Entry info. + */ + private void setResult(@Nullable GridCacheEntryInfo info) { + assert info == null || skipVals == (info.value() == null); + + if (skipVals) { + if (info != null) + setSkipValueResult(true, info.version()); + else + setSkipValueResult(false, null); + } + else { + if (info != null) + setResult(info.value(), info.version()); + else + setResult(null, null); + } + } + + /** + * @param res Result. + * @param ver Version. + */ + private void setSkipValueResult(boolean res, @Nullable GridCacheVersion ver) { + assert skipVals; + + if (needVer) { + assert ver != null || !res; + + onDone(new T2<>(res, ver)); + } + else + onDone(res); + } + + /** + * @param val Value. + * @param ver Version. + */ + private void setResult(@Nullable CacheObject val, @Nullable GridCacheVersion ver) { + try { + assert !skipVals; + + if (val != null) { + if (needVer) { + assert ver != null; + + onDone(new T2<>(val, ver)); + } + else { + if (!keepCacheObjects) { + Object res = CU.value(val, cctx, true); + + if (deserializePortable && !skipVals) + res = cctx.unwrapPortableIfNeeded(res, false); + + onDone(res); + } + else + onDone(val); + } + } + else + onDone(null); + } + catch (Exception e) { + onDone(e); + } + } + + /** + * Affinity node to send get request to. + * + * @param key Key to get. + * @param topVer Topology version. + * @return Affinity node to get key from. + */ + private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) { + if (!canRemap) { + List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer); + + for (ClusterNode node : affNodes) { + if (cctx.discovery().alive(node)) + return node; + } + + return null; + } + else + return cctx.affinity().primary(key, topVer); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid futureId() { + return futId; + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + if (!processResponse(nodeId)) + return false; + + if (canRemap) { + final AffinityTopologyVersion updTopVer = new AffinityTopologyVersion( + Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion())); + + cctx.affinity().affinityReadyFuture(updTopVer).listen( + new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { + @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) { + try { + fut.get(); + + remap(updTopVer); + } + catch (IgniteCheckedException e) { + onDone(e); + } + } + }); + } + else + remap(topVer); + + return true; + } + + /** + * @param topVer Topology version. + */ + private void remap(final AffinityTopologyVersion topVer) { + cctx.closures().runLocalSafe(new Runnable() { + @Override public void run() { + map(topVer); + } + }); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(Object res, Throwable err) { + if (super.onDone(res, err)) { + // Don't forget to clean up. + if (trackable) + cctx.mvcc().removeFuture(futId); + + cctx.dht().sendTtlUpdateRequest(expiryPlc); + + return true; + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean trackable() { + return trackable; + } + + /** {@inheritDoc} */ + @Override public void markNotTrackable() { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridPartitionedSingleGetFuture.class, this); + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index 7f9edb2..75f8c2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -65,11 +65,14 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheE import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; @@ -242,6 +245,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } }); + ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetRequest.class, new CI2<UUID, GridNearSingleGetRequest>() { + @Override public void apply(UUID nodeId, GridNearSingleGetRequest req) { + processNearSingleGetRequest(nodeId, req); + } + }); + ctx.io().addHandler(ctx.cacheId(), GridNearAtomicUpdateRequest.class, new CI2<UUID, GridNearAtomicUpdateRequest>() { @Override public void apply(UUID nodeId, GridNearAtomicUpdateRequest req) { processNearAtomicUpdateRequest(nodeId, req); @@ -279,6 +288,12 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { processNearGetResponse(nodeId, res); } }); + + ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { + @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) { + processNearSingleGetResponse(nodeId, res); + } + }); } } @@ -301,6 +316,45 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** {@inheritDoc} */ + @Override protected IgniteInternalFuture<V> getAsync(final K key, + final boolean forcePrimary, + final boolean skipTx, + @Nullable UUID subjId, + final String taskName, + final boolean deserializePortable, + final boolean skipVals, + final boolean canRemap) { + ctx.checkSecurity(SecurityPermission.CACHE_READ); + + if (keyCheck) + validateCacheKey(key); + + CacheOperationContext opCtx = ctx.operationContextPerCall(); + + subjId = ctx.subjectIdPerCall(null, opCtx); + + final UUID subjId0 = subjId; + + final ExpiryPolicy expiryPlc = skipVals ? null : opCtx != null ? opCtx.expiry() : null; + + final boolean skipStore = opCtx != null && opCtx.skipStore(); + + return asyncOp(new CO<IgniteInternalFuture<V>>() { + @Override public IgniteInternalFuture<V> apply() { + return getAsync0(ctx.toCacheKeyObject(key), + forcePrimary, + subjId0, + taskName, + deserializePortable, + expiryPlc, + skipVals, + skipStore, + canRemap); + } + }); + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( @Nullable final Collection<? extends K> keys, final boolean forcePrimary, @@ -914,9 +968,57 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { } /** + * Entry point to all public API single get methods. + * + * @param key Key. + * @param forcePrimary Force primary flag. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. + * @param skipVals Skip values flag. + * @param skipStore Skip store flag. + * @param canRemap Can remap flag. + * @return Get future. + */ + private IgniteInternalFuture<V> getAsync0(KeyCacheObject key, + boolean forcePrimary, + UUID subjId, + String taskName, + boolean deserializePortable, + @Nullable ExpiryPolicy expiryPlc, + boolean skipVals, + boolean skipStore, + boolean canRemap + ) { + AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : + ctx.shared().exchange().readyAffinityVersion(); + + IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); + + GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx, + key, + topVer, + !skipStore, + forcePrimary, + subjId, + taskName, + deserializePortable, + expiry, + skipVals, + canRemap, + false, + false); + + fut.init(); + + return (IgniteInternalFuture<V>)fut; + } + + /** * Entry point to all public API get methods. * - * @param keys Keys to remove. + * @param keys Keys. * @param forcePrimary Force primary flag. * @param subjId Subject ID. * @param taskName Task name. @@ -942,7 +1044,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); // Optimisation: try to resolve value locally and escape 'get future' creation. - if (!forcePrimary) { + if (!forcePrimary && ctx.affinityNode()) { Map<K, V> locVals = U.newHashMap(keys.size()); boolean success = true; @@ -2409,27 +2511,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { /** * @param nodeId Sender node ID. - * @param res Near get response. - */ - private void processNearGetResponse(UUID nodeId, GridNearGetResponse res) { - if (log.isDebugEnabled()) - log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']'); - - GridPartitionedGetFuture<K, V> fut = (GridPartitionedGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future( - res.version(), res.futureId()); - - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); - - return; - } - - fut.onResult(nodeId, res); - } - - /** - * @param nodeId Sender node ID. * @param req Near atomic update request. */ private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicUpdateRequest req) { http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java index 4ace5c4..c34dcfd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateFuture.java @@ -22,7 +22,6 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.UUID; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; import javax.cache.processor.EntryProcessor; import org.apache.ignite.IgniteCheckedException; @@ -48,7 +47,6 @@ import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import org.jsr166.ConcurrentHashMap8; import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; @@ -67,39 +65,42 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> protected static IgniteLogger log; /** Cache context. */ - private GridCacheContext cctx; + private final GridCacheContext cctx; /** Future version. */ - private GridCacheVersion futVer; + private final GridCacheVersion futVer; /** Write version. */ - private GridCacheVersion writeVer; + private final GridCacheVersion writeVer; /** Force transform backup flag. */ private boolean forceTransformBackups; /** Completion callback. */ @GridToStringExclude - private CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb; + private final CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse> completionCb; /** Mappings. */ @GridToStringInclude - private ConcurrentMap<UUID, GridDhtAtomicUpdateRequest> mappings = new ConcurrentHashMap8<>(); + private final Map<UUID, GridDhtAtomicUpdateRequest> mappings; /** Entries with readers. */ private Map<KeyCacheObject, GridDhtCacheEntry> nearReadersEntries; /** Update request. */ - private GridNearAtomicUpdateRequest updateReq; + private final GridNearAtomicUpdateRequest updateReq; /** Update response. */ - private GridNearAtomicUpdateResponse updateRes; + private final GridNearAtomicUpdateResponse updateRes; /** Future keys. */ - private Collection<KeyCacheObject> keys; + private final Collection<KeyCacheObject> keys; /** */ - private boolean waitForExchange; + private final boolean waitForExchange; + + /** Response count. */ + private volatile int resCnt; /** * @param cctx Cache context. @@ -128,6 +129,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> log = U.logger(cctx.kernalContext(), logRef, GridDhtAtomicUpdateFuture.class); keys = new ArrayList<>(updateReq.keys().size()); + mappings = U.newHashMap(updateReq.keys().size()); boolean topLocked = updateReq.topologyLocked() || (updateReq.fastMap() && !updateReq.clientRequest()); @@ -145,22 +147,37 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> } /** {@inheritDoc} */ - @Override public Collection<? extends ClusterNode> nodes() { - return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull()); - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { if (log.isDebugEnabled()) log.debug("Processing node leave event [fut=" + this + ", nodeId=" + nodeId + ']'); + return registerResponse(nodeId); + } + + /** + * @param nodeId Node ID. + * @return {@code True} if request found. + */ + private boolean registerResponse(UUID nodeId) { + int resCnt0; + GridDhtAtomicUpdateRequest req = mappings.get(nodeId); if (req != null) { - // Remove only after added keys to failed set. - mappings.remove(nodeId); + synchronized (this) { + if (req.onResponse()) { + resCnt0 = resCnt; + + resCnt0 += 1; + + resCnt = resCnt0; + } + else + return false; + } - checkComplete(); + if (resCnt0 == mappings.size()) + onDone(); return true; } @@ -343,18 +360,18 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> U.warn(log, "Failed to send update request to backup node because it left grid: " + req.nodeId()); - mappings.remove(req.nodeId()); + registerResponse(req.nodeId()); } catch (IgniteCheckedException e) { U.error(log, "Failed to send update request to backup node (did node leave the grid?): " + req.nodeId(), e); - mappings.remove(req.nodeId()); + registerResponse(req.nodeId()); } } } - - checkComplete(); + else + onDone(); // Send response right away if no ACKs from backup is required. // Backups will send ACKs anyway, future will be completed after all backups have replied. @@ -389,9 +406,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> } } - mappings.remove(nodeId); - - checkComplete(); + registerResponse(nodeId); } /** @@ -403,22 +418,7 @@ public class GridDhtAtomicUpdateFuture extends GridFutureAdapter<Void> if (log.isDebugEnabled()) log.debug("Received deferred DHT atomic update future result [nodeId=" + nodeId + ']'); - mappings.remove(nodeId); - - checkComplete(); - } - - /** - * Checks if all required responses are received. - */ - private void checkComplete() { - // Always wait for replies from all backups. - if (mappings.isEmpty()) { - if (log.isDebugEnabled()) - log.debug("Completing DHT atomic update future: " + this); - - onDone(); - } + registerResponse(nodeId); } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java index e55cac9..1219f2f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicUpdateRequest.java @@ -139,6 +139,10 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid /** Task name hash. */ private int taskNameHash; + /** On response flag. Access should be synced on future. */ + @GridDirectTransient + private boolean onRes; + /** * Empty constructor required by {@link Externalizable}. */ @@ -527,6 +531,13 @@ public class GridDhtAtomicUpdateRequest extends GridCacheMessage implements Grid } /** + * @return {@code True} if on response flag changed. + */ + public boolean onResponse() { + return !onRes && (onRes = true); + } + + /** * @return Optional arguments for entry processor. */ @Nullable public Object[] invokeArguments() { http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java index ae662c8..a786803 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridNearAtomicUpdateFuture.java @@ -238,11 +238,6 @@ public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> return state.futureVersion(); } - /** {@inheritDoc} */ - @Override public Collection<? extends ClusterNode> nodes() { - throw new UnsupportedOperationException(); - } - /** * @return {@code True} if this future should block partition map exchange. */ http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 7131aa5..47b7aea 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -53,8 +53,10 @@ import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvali import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest; @@ -67,6 +69,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI2; +import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; @@ -138,7 +141,13 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte ctx.io().addHandler(ctx.cacheId(), GridNearGetResponse.class, new CI2<UUID, GridNearGetResponse>() { @Override public void apply(UUID nodeId, GridNearGetResponse res) { - processGetResponse(nodeId, res); + processNearGetResponse(nodeId, res); + } + }); + + ctx.io().addHandler(ctx.cacheId(), GridNearSingleGetResponse.class, new CI2<UUID, GridNearSingleGetResponse>() { + @Override public void apply(UUID nodeId, GridNearSingleGetResponse res) { + processNearSingleGetResponse(nodeId, res); } }); @@ -185,6 +194,80 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** {@inheritDoc} */ + @Override protected IgniteInternalFuture<V> getAsync(final K key, + boolean forcePrimary, + boolean skipTx, + @Nullable UUID subjId, + String taskName, + final boolean deserializePortable, + final boolean skipVals, + boolean canRemap) { + ctx.checkSecurity(SecurityPermission.CACHE_READ); + + if (keyCheck) + validateCacheKey(key); + + IgniteTxLocalAdapter tx = ctx.tm().threadLocalTx(ctx); + + final CacheOperationContext opCtx = ctx.operationContextPerCall(); + + if (tx != null && !tx.implicit() && !skipTx) { + return asyncOp(tx, new AsyncOp<V>() { + @Override public IgniteInternalFuture<V> op(IgniteTxLocalAdapter tx) { + IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx, + Collections.singleton(ctx.toCacheKeyObject(key)), + deserializePortable, + skipVals, + false, + opCtx != null && opCtx.skipStore()); + + return fut.chain(new CX1<IgniteInternalFuture<Map<Object, Object>>, V>() { + @SuppressWarnings("unchecked") + @Override public V applyx(IgniteInternalFuture<Map<Object, Object>> e) + throws IgniteCheckedException { + Map<Object, Object> map = e.get(); + + assert map.isEmpty() || map.size() == 1 : map.size(); + + if (skipVals) { + Boolean val = map.isEmpty() ? false : (Boolean)F.firstValue(map); + + return (V)(val); + } + + return (V)map.get(key); + } + }); + } + }); + } + + AffinityTopologyVersion topVer = tx == null ? + (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) : + tx.topologyVersion(); + + subjId = ctx.subjectIdPerCall(subjId, opCtx); + + GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx, + ctx.toCacheKeyObject(key), + topVer, + opCtx == null || !opCtx.skipStore(), + forcePrimary, + subjId, + taskName, + deserializePortable, + skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), + skipVals, + canRemap, + /*needVer*/false, + /*keepCacheObjects*/false); + + fut.init(); + + return (IgniteInternalFuture<V>)fut; + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( @Nullable final Collection<? extends K> keys, boolean forcePrimary, @@ -290,6 +373,54 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** + * @param key Key to load. + * @param readThrough Read through flag. + * @param forcePrimary Force get from primary node flag. + * @param topVer Topology version. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. + * @param skipVals Skip values flag. + * @param canRemap Flag indicating whether future can be remapped on a newer topology version. + * @param needVer If {@code true} returns values as tuples containing value and version. + * @param keepCacheObj Keep cache objects flag. + * @return Load future. + */ + public final IgniteInternalFuture<Object> loadAsync( + KeyCacheObject key, + boolean readThrough, + boolean forcePrimary, + AffinityTopologyVersion topVer, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, + @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean skipVals, + boolean canRemap, + boolean needVer, + boolean keepCacheObj + ) { + GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx, + ctx.toCacheKeyObject(key), + topVer, + readThrough, + forcePrimary, + subjId, + taskName, + deserializePortable, + expiryPlc, + skipVals, + canRemap, + needVer, + keepCacheObj); + + fut.init(); + + return fut; + } + + /** * @param keys Keys to load. * @param readThrough Read through flag. * @param forcePrimary Force get from primary node flag. @@ -299,9 +430,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param deserializePortable Deserialize portable flag. * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. - * @return Loaded values. + * @param canRemap Flag indicating whether future can be remapped on a newer topology version. + * @param needVer If {@code true} returns values as tuples containing value and version. + * @param keepCacheObj Keep cache objects flag. + * @return Load future. */ - public IgniteInternalFuture<Map<K, V>> loadAsync( + public final IgniteInternalFuture<Map<K, V>> loadAsync( @Nullable Collection<KeyCacheObject> keys, boolean readThrough, boolean forcePrimary, @@ -931,24 +1065,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte } /** - * @param nodeId Sender ID. - * @param res Response. - */ - private void processGetResponse(UUID nodeId, GridNearGetResponse res) { - GridPartitionedGetFuture<K, V> fut = (GridPartitionedGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future( - res.version(), res.futureId()); - - if (fut == null) { - if (log.isDebugEnabled()) - log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); - - return; - } - - fut.onResult(nodeId, res); - } - - /** * @param nodeId Node ID. * @param res Response. */ @@ -957,7 +1073,7 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte assert res != null; GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc(). - <Boolean>future(res.version(), res.futureId()); + <Boolean>mvccFuture(res.version(), res.futureId()); if (fut != null) fut.onResult(nodeId, res); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index abeb509..8245d88 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -35,10 +35,11 @@ import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; import org.apache.ignite.internal.processors.cache.CacheObject; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; +import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; @@ -78,7 +79,7 @@ import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; * Colocated cache lock future. */ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture<Boolean> - implements GridCacheFuture<Boolean> { + implements GridCacheMvccFuture<Boolean> { /** */ private static final long serialVersionUID = 0L; @@ -198,25 +199,16 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture valMap = new ConcurrentHashMap8<>(keys.size(), 1f); } - /** - * @return Participating nodes. - */ - @Override public Collection<? extends ClusterNode> nodes() { - return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); - - return cctx.discovery().localNode(); - } - }); - } - /** {@inheritDoc} */ @Override public GridCacheVersion version() { return lockVer; } + /** {@inheritDoc} */ + @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { + return false; + } + /** * @return Future ID. */ @@ -538,7 +530,7 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture log.debug("Completing future: " + this); // Clean up. - cctx.mvcc().removeFuture(this); + cctx.mvcc().removeMvccFuture(this); if (timeoutObj != null) cctx.time().removeTimeoutObject(timeoutObj); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java index 1559a91..c14621a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/CacheVersionedValue.java @@ -53,7 +53,7 @@ public class CacheVersionedValue implements Message { * @param val Cache value. * @param ver Cache version. */ - CacheVersionedValue(CacheObject val, GridCacheVersion ver) { + public CacheVersionedValue(CacheObject val, GridCacheVersion ver) { this.val = val; this.ver = ver; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java index 3c3527a..eb0b637 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java @@ -52,6 +52,7 @@ import org.apache.ignite.internal.processors.cache.GridCacheValueCollection; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter; +import org.apache.ignite.internal.processors.cache.distributed.dht.CacheGetFuture; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition; @@ -292,8 +293,7 @@ public abstract class GridNearCacheAdapter<K, V> extends GridDistributedCacheAda * @param res Response. */ protected void processGetResponse(UUID nodeId, GridNearGetResponse res) { - GridNearGetFuture<K, V> fut = (GridNearGetFuture<K, V>)ctx.mvcc().<Map<K, V>>future( - res.version(), res.futureId()); + CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId()); if (fut == null) { if (log.isDebugEnabled()) http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java index ae1d43c..dfaa44e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java @@ -170,24 +170,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } /** {@inheritDoc} */ - @Override public GridCacheVersion version() { - return ver; - } - - /** {@inheritDoc} */ - @Override public Collection<? extends ClusterNode> nodes() { - return - F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<Map<K, V>>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<Map<K, V>> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); - - return cctx.discovery().localNode(); - } - }); - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { boolean found = false; @@ -227,7 +209,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap if (super.onDone(res, err)) { // Don't forget to clean up. if (trackable) - cctx.mvcc().removeFuture(this); + cctx.mvcc().removeFuture(futId); cache().dht().sendTtlUpdateRequest(expiryPlc); @@ -343,7 +325,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap if (!trackable) { trackable = true; - cctx.mvcc().addFuture(this); + cctx.mvcc().addFuture(this, futId); } MiniFuture fut = new MiniFuture(n, mappedKeys, saved, topVer); @@ -386,6 +368,7 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap * @param saved Reserved near cache entries. * @return Map. */ + @SuppressWarnings("unchecked") private Map<KeyCacheObject, GridNearCacheEntry> map( KeyCacheObject key, Map<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>> mappings, @@ -538,11 +521,17 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } else { K key0 = key.value(cctx.cacheObjectContext(), true); - V val0 = v.value(cctx.cacheObjectContext(), true); - - val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable); key0 = (K)cctx.unwrapPortableIfNeeded(key0, !deserializePortable); + V val0; + + if (!skipVals) { + val0 = v.value(cctx.cacheObjectContext(), true); + val0 = (V)cctx.unwrapPortableIfNeeded(val0, !deserializePortable); + } + else + val0 = (V)Boolean.TRUE; + add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0))); } } @@ -618,28 +607,6 @@ public final class GridNearGetFuture<K, V> extends CacheDistributedGetFutureAdap } /** - * Affinity node to send get request to. - * - * @param key Key to get. - * @param topVer Topology version. - * @return Affinity node to get key from. - */ - private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) { - if (!canRemap) { - List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer); - - for (ClusterNode node : affNodes) { - if (cctx.discovery().alive(node)) - return node; - } - - return null; - } - else - return cctx.affinity().primary(key, topVer); - } - - /** * @return Near cache. */ private GridNearCacheAdapter<K, V> cache() { http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java index 8482217..6d60298 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetRequest.java @@ -133,7 +133,6 @@ public class GridNearGetRequest extends GridCacheMessage implements GridCacheDep ) { assert futId != null; assert miniId != null; - assert ver != null; assert keys != null; this.cacheId = cacheId; http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java index fc06ab1..15a791f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetResponse.java @@ -100,8 +100,6 @@ public class GridNearGetResponse extends GridCacheMessage implements GridCacheDe boolean addDepInfo ) { assert futId != null; - assert miniId != null; - assert ver != null; this.cacheId = cacheId; this.futId = futId; http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java index 9c3701f..76f2fbe 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearLockFuture.java @@ -209,20 +209,6 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean valMap = new ConcurrentHashMap8<>(keys.size(), 1f); } - /** - * @return Participating nodes. - */ - @Override public Collection<? extends ClusterNode> nodes() { - return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); - - return cctx.discovery().localNode(); - } - }); - } - /** {@inheritDoc} */ @Override public GridCacheVersion version() { return lockVer; @@ -672,7 +658,7 @@ public final class GridNearLockFuture extends GridCompoundIdentityFuture<Boolean log.debug("Completing future: " + this); // Clean up. - cctx.mvcc().removeFuture(this); + cctx.mvcc().removeMvccFuture(this); if (timeoutObj != null) cctx.time().removeTimeoutObject(timeoutObj); http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java index 1569b14..770c47a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticSerializableTxPrepareFuture.java @@ -73,8 +73,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; /** * */ -public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter - implements GridCacheMvccFuture<IgniteInternalTx> { +public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter { /** */ public static final IgniteProductVersion SER_TX_SINCE = IgniteProductVersion.fromString("1.5.0"); @@ -148,18 +147,6 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim } /** {@inheritDoc} */ - @Override public Collection<? extends ClusterNode> nodes() { - return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); - - return cctx.discovery().localNode(); - } - }); - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { boolean found = false; @@ -287,7 +274,7 @@ public class GridNearOptimisticSerializableTxPrepareFuture extends GridNearOptim tx.setRollbackOnly(); // Don't forget to clean up. - cctx.mvcc().removeFuture(this); + cctx.mvcc().removeMvccFuture(this); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java index 82e3868..eaf476c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearOptimisticTxPrepareFuture.java @@ -35,7 +35,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; -import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; @@ -54,7 +53,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionTimeoutException; import org.jetbrains.annotations.Nullable; @@ -66,8 +64,7 @@ import static org.apache.ignite.transactions.TransactionState.PREPARING; /** * */ -public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter - implements GridCacheMvccFuture<IgniteInternalTx> { +public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepareFutureAdapter { /** */ @GridToStringInclude private Collection<IgniteTxKey> lockKeys = new GridConcurrentHashSet<>(); @@ -100,18 +97,6 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa } /** {@inheritDoc} */ - @Override public Collection<? extends ClusterNode> nodes() { - return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { - if (isMini(f)) - return ((MiniFuture)f).node(); - - return cctx.discovery().localNode(); - } - }); - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { boolean found = false; @@ -261,7 +246,7 @@ public class GridNearOptimisticTxPrepareFuture extends GridNearOptimisticTxPrepa if (super.onDone(tx, err0)) { // Don't forget to clean up. - cctx.mvcc().removeFuture(this); + cctx.mvcc().removeMvccFuture(this); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index 103105e..ffe5373 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -28,6 +28,8 @@ import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; +import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxMapping; @@ -42,7 +44,6 @@ import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; @@ -68,15 +69,6 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA } /** {@inheritDoc} */ - @Override public Collection<? extends ClusterNode> nodes() { - return F.viewReadOnly(futures(), new IgniteClosure<IgniteInternalFuture<?>, ClusterNode>() { - @Nullable @Override public ClusterNode apply(IgniteInternalFuture<?> f) { - return ((MiniFuture)f).node(); - } - }); - } - - /** {@inheritDoc} */ @Override public boolean onNodeLeft(UUID nodeId) { boolean found = false; @@ -280,6 +272,11 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA } /** {@inheritDoc} */ + @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { + return false; + } + + /** {@inheritDoc} */ @Override public boolean onDone(@Nullable IgniteInternalTx res, @Nullable Throwable err) { if (err != null) this.err.compareAndSet(null, err); @@ -290,7 +287,7 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA tx.state(PREPARED); if (super.onDone(tx, err)) { - cctx.mvcc().removeFuture(this); + cctx.mvcc().removeMvccFuture(this); return true; } http://git-wip-us.apache.org/repos/asf/ignite/blob/1f103067/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java new file mode 100644 index 0000000..a506007 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearSingleGetRequest.java @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheDeployable; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.NotNull; + +import static org.apache.ignite.internal.processors.cache.GridCacheUtils.SKIP_STORE_FLAG_MASK; + +/** + * + */ +public class GridNearSingleGetRequest extends GridCacheMessage implements GridCacheDeployable { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + public static final int READ_THROUGH_FLAG_MASK = 0x01; + + /** */ + public static final int SKIP_VALS_FLAG_MASK = 0x02; + + /** */ + public static final int ADD_READER_FLAG_MASK = 0x04; + + /** */ + public static final int NEED_VER_FLAG_MASK = 0x08; + + /** */ + public static final int NEED_ENTRY_INFO_FLAG_MASK = 0x10; + + /** Future ID. */ + private IgniteUuid futId; + + /** */ + private KeyCacheObject key; + + /** Flags. */ + private byte flags; + + /** Topology version. */ + private AffinityTopologyVersion topVer; + + /** Subject ID. */ + private UUID subjId; + + /** Task name hash. */ + private int taskNameHash; + + /** TTL for read operation. */ + private long accessTtl; + + /** + * Empty constructor required for {@link Message}. + */ + public GridNearSingleGetRequest() { + // No-op. + } + + /** + * @param cacheId Cache ID. + * @param futId Future ID. + * @param key Key. + * @param readThrough Read through flag. + * @param skipVals Skip values flag. When false, only boolean values will be returned indicating whether + * cache entry has a value. + * @param topVer Topology version. + * @param subjId Subject ID. + * @param taskNameHash Task name hash. + * @param accessTtl New TTL to set after entry is accessed, -1 to leave unchanged. + * @param addReader Add reader flag. + * @param needVer {@code True} if entry version is needed. + * @param addDepInfo Deployment info. + */ + public GridNearSingleGetRequest( + int cacheId, + IgniteUuid futId, + KeyCacheObject key, + boolean readThrough, + @NotNull AffinityTopologyVersion topVer, + UUID subjId, + int taskNameHash, + long accessTtl, + boolean skipVals, + boolean addReader, + boolean needVer, + boolean addDepInfo + ) { + assert futId != null; + assert key != null; + + this.cacheId = cacheId; + this.futId = futId; + this.key = key; + this.topVer = topVer; + this.subjId = subjId; + this.taskNameHash = taskNameHash; + this.accessTtl = accessTtl; + this.addDepInfo = addDepInfo; + + if (readThrough) + flags = (byte)(flags | READ_THROUGH_FLAG_MASK); + + if (skipVals) + flags = (byte)(flags | SKIP_VALS_FLAG_MASK); + + if (addReader) + flags = (byte)(flags | ADD_READER_FLAG_MASK); + + if (needVer) + flags = (byte)(flags | NEED_VER_FLAG_MASK); + } + + /** + * @return Key. + */ + public KeyCacheObject key() { + return key; + } + + /** + * @return Future ID. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Subject ID. + */ + public UUID subjectId() { + return subjId; + } + + /** + * Gets task name hash. + * + * @return Task name hash. + */ + public int taskNameHash() { + return taskNameHash; + } + + /** + * @return Topology version. + */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return New TTL to set after entry is accessed, -1 to leave unchanged. + */ + public long accessTtl() { + return accessTtl; + } + + /** + * @return Read through flag. + */ + public boolean readThrough() { + return (flags & SKIP_STORE_FLAG_MASK) != 0; + } + + /** + * @return Read through flag. + */ + public boolean skipValues() { + return (flags & SKIP_VALS_FLAG_MASK) != 0; + } + + /** + * @return Add reader flag. + */ + public boolean addReader() { + return (flags & ADD_READER_FLAG_MASK) != 0; + } + + /** + * @return {@code True} if entry version is needed. + */ + public boolean needVersion() { + return (flags & NEED_VER_FLAG_MASK) != 0; + } + + /** + * @return {@code True} if full entry information is needed. + */ + public boolean needEntryInfo() { + return (flags & NEED_ENTRY_INFO_FLAG_MASK) != 0; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + assert key != null; + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + prepareMarshalCacheObject(key, cctx); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + assert key != null; + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + key.finishUnmarshal(cctx.cacheObjectContext(), ldr); + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + accessTtl = reader.readLong("accessTtl"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + flags = reader.readByte("flags"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + futId = reader.readIgniteUuid("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + key = reader.readMessage("key"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridNearSingleGetRequest.class); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeLong("accessTtl", accessTtl)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeByte("flags", flags)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeIgniteUuid("futId", futId)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeMessage("key", key)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeUuid("subjId", subjId)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 116; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 10; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearSingleGetRequest.class, this); + } +}