http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java index 0834e88..fcbf58d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedTxRemoteAdapter.java @@ -574,19 +574,42 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter // Nullify explicit version so that innerSet/innerRemove will work as usual. explicitVer = null; + GridCacheVersion dhtVer = cached.isNear() ? writeVersion() : null; + if (op == CREATE || op == UPDATE) { // Invalidate only for near nodes (backups cannot be invalidated). if (isSystemInvalidate() || (isInvalidate() && cacheCtx.isNear())) - cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true, - topVer, null, replicate ? DR_BACKUP : DR_NONE, + cached.innerRemove(this, + eventNodeId(), + nodeId, + false, + false, + true, + true, + topVer, + null, + replicate ? DR_BACKUP : DR_NONE, near() ? null : explicitVer, CU.subjectId(this, cctx), - resolveTaskName()); + resolveTaskName(), + dhtVer); else { - cached.innerSet(this, eventNodeId(), nodeId, val, false, false, - txEntry.ttl(), true, true, topVer, null, - replicate ? DR_BACKUP : DR_NONE, txEntry.conflictExpireTime(), - near() ? null : explicitVer, CU.subjectId(this, cctx), - resolveTaskName()); + cached.innerSet(this, + eventNodeId(), + nodeId, + val, + false, + false, + txEntry.ttl(), + true, + true, + topVer, + null, + replicate ? DR_BACKUP : DR_NONE, + txEntry.conflictExpireTime(), + near() ? null : explicitVer, + CU.subjectId(this, cctx), + resolveTaskName(), + dhtVer); // Keep near entry up to date. if (nearCached != null) { @@ -602,9 +625,20 @@ public class GridDistributedTxRemoteAdapter extends IgniteTxAdapter } } else if (op == DELETE) { - cached.innerRemove(this, eventNodeId(), nodeId, false, false, true, true, - topVer, null, replicate ? DR_BACKUP : DR_NONE, - near() ? null : explicitVer, CU.subjectId(this, cctx), resolveTaskName()); + cached.innerRemove(this, + eventNodeId(), + nodeId, + false, + false, + true, + true, + topVer, + null, + replicate ? DR_BACKUP : DR_NONE, + near() ? null : explicitVer, + CU.subjectId(this, cctx), + resolveTaskName(), + dhtVer); // Keep near entry up to date. if (nearCached != null)
http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java new file mode 100644 index 0000000..721ba4e --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/CacheDistributedGetFutureAdapter.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.dht; + +import java.util.Collection; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheFuture; +import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.lang.IgniteUuid; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS; +import static org.apache.ignite.IgniteSystemProperties.getInteger; + +/** + * + */ +public abstract class CacheDistributedGetFutureAdapter<K, V> extends GridCompoundIdentityFuture<Map<K, V>> + implements GridCacheFuture<Map<K, V>> { + /** Default max remap count value. */ + public static final int DFLT_MAX_REMAP_CNT = 3; + + /** Maximum number of attempts to remap key to the same primary node. */ + protected static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT); + + /** Context. */ + protected final GridCacheContext<K, V> cctx; + + /** Keys. */ + protected Collection<KeyCacheObject> keys; + + /** Read through flag. */ + protected boolean readThrough; + + /** Force primary flag. */ + protected boolean forcePrimary; + + /** Future ID. */ + protected IgniteUuid futId; + + /** Trackable flag. */ + protected boolean trackable; + + /** Remap count. */ + protected AtomicInteger remapCnt = new AtomicInteger(); + + /** Subject ID. */ + protected UUID subjId; + + /** Task name. */ + protected String taskName; + + /** Whether to deserialize portable objects. */ + protected boolean deserializePortable; + + /** Skip values flag. */ + protected boolean skipVals; + + /** Expiry policy. */ + protected IgniteCacheExpiryPolicy expiryPlc; + + /** Flag indicating that get should be done on a locked topology version. */ + protected final boolean canRemap; + + /** */ + protected final boolean needVer; + + /** */ + protected final boolean keepCacheObjects; + + /** + * @param cctx Context. + * @param keys Keys. + * @param readThrough Read through flag. + * @param forcePrimary If {@code true} then will force network trip to primary node even + * if called on backup node. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. + * @param skipVals Skip values flag. + * @param canRemap Flag indicating whether future can be remapped on a newer topology version. + * @param needVer If {@code true} returns values as tuples containing value and version. + * @param keepCacheObjects Keep cache objects flag. + */ + protected CacheDistributedGetFutureAdapter( + GridCacheContext<K, V> cctx, + Collection<KeyCacheObject> keys, + boolean readThrough, + boolean forcePrimary, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, + @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean skipVals, + boolean canRemap, + boolean needVer, + boolean keepCacheObjects + ) { + super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); + + assert !F.isEmpty(keys); + + this.cctx = cctx; + this.keys = keys; + this.readThrough = readThrough; + this.forcePrimary = forcePrimary; + this.subjId = subjId; + this.taskName = taskName; + this.deserializePortable = deserializePortable; + this.expiryPlc = expiryPlc; + this.skipVals = skipVals; + this.canRemap = canRemap; + this.needVer = needVer; + this.keepCacheObjects = keepCacheObjects; + + futId = IgniteUuid.randomUuid(); + } + + /** + * @param map Result map. + * @param key Key. + * @param val Value. + * @param ver Version. + */ + @SuppressWarnings("unchecked") + protected final void versionedResult(Map map, KeyCacheObject key, Object val, GridCacheVersion ver) { + assert needVer; + assert skipVals || val != null; + assert ver != null; + + map.put(key, new T2<>(skipVals ? true : val, ver)); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java index 9d02705..bdd1140 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java @@ -69,6 +69,7 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.CI3; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -562,7 +563,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap /** * This method is used internally. Use - * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, AffinityTopologyVersion, UUID, int, IgniteCacheExpiryPolicy, boolean)} + * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, AffinityTopologyVersion, UUID, int, IgniteCacheExpiryPolicy, boolean)} * method instead to retrieve DHT value. * * @param keys {@inheritDoc} @@ -574,7 +575,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap @Nullable Collection<? extends K> keys, boolean forcePrimary, boolean skipTx, - @Nullable GridCacheEntryEx entry, @Nullable UUID subjId, String taskName, boolean deserializePortable, @@ -585,7 +585,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap return getAllAsync(keys, opCtx == null || !opCtx.skipStore(), - null, /*don't check local tx. */false, subjId, taskName, @@ -603,9 +602,10 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param taskName Task name. * @param expiry Expiry policy. * @param skipVals Skip values flag. + * @param canRemap Can remap flag. * @return Get future. */ - IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> getDhtAllAsync( + IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> getDhtAllAsync( Collection<KeyCacheObject> keys, boolean readThrough, @Nullable UUID subjId, @@ -623,7 +623,8 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap expiry, skipVals, /*keep cache objects*/true, - canRemap); + canRemap, + /*need version*/true); } /** @@ -631,18 +632,17 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap * @param msgId Message ID. * @param keys Keys to get. * @param readThrough Read through flag. - * @param reload Reload flag. * @param topVer Topology version. * @param subjId Subject ID. * @param taskNameHash Task name hash code. * @param expiry Expiry policy. + * @param skipVals Skip values flag. * @return DHT future. */ public GridDhtFuture<Collection<GridCacheEntryInfo>> getDhtAsync(UUID reader, long msgId, LinkedHashMap<KeyCacheObject, Boolean> keys, boolean readThrough, - boolean reload, AffinityTopologyVersion topVer, @Nullable UUID subjId, int taskNameHash, @@ -653,7 +653,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap reader, keys, readThrough, - reload, /*tx*/null, topVer, subjId, @@ -672,6 +671,7 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap */ protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) { assert ctx.affinityNode(); + assert !req.reload() : req; long ttl = req.accessTtl(); @@ -682,7 +682,6 @@ public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdap req.messageId(), req.keys(), req.readThrough(), - req.reload(), req.topologyVersion(), req.subjectId(), req.taskNameHash(), http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java index be2f3d3..1b2d834 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheEntry.java @@ -163,6 +163,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { * @param topVer Topology version. * @param threadId Owning thread ID. * @param ver Lock version. + * @param serOrder Version for serializable transactions ordering. + * @param serReadVer Optional read entry version for optimistic serializable transaction. * @param timeout Timeout to acquire lock. * @param reenter Reentry flag. * @param tx Tx flag. @@ -177,10 +179,17 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { AffinityTopologyVersion topVer, long threadId, GridCacheVersion ver, + @Nullable GridCacheVersion serOrder, + @Nullable GridCacheVersion serReadVer, long timeout, boolean reenter, boolean tx, - boolean implicitSingle) throws GridCacheEntryRemovedException, GridDistributedLockCancelledException { + boolean implicitSingle) + throws GridCacheEntryRemovedException, GridDistributedLockCancelledException + { + assert serReadVer == null || serOrder != null; + assert !reenter || serOrder == null; + GridCacheMvccCandidate cand; GridCacheMvccCandidate prev; GridCacheMvccCandidate owner; @@ -213,6 +222,7 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { threadId, ver, timeout, + serOrder, reenter, tx, implicitSingle, @@ -235,12 +245,12 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { val = this.val; - if (mvcc != null && mvcc.isEmpty()) + if (mvcc.isEmpty()) mvccExtras(null); } // Don't link reentries. - if (cand != null && !cand.reentry()) + if (!cand.reentry()) // Link with other candidates in the same thread. cctx.mvcc().addNext(cctx, cand); @@ -250,7 +260,10 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { } /** {@inheritDoc} */ - @Override public boolean tmLock(IgniteInternalTx tx, long timeout) + @Override public boolean tmLock(IgniteInternalTx tx, + long timeout, + @Nullable GridCacheVersion serOrder, + GridCacheVersion serReadVer) throws GridCacheEntryRemovedException, GridDistributedLockCancelledException { if (tx.local()) { GridDhtTxLocalAdapter dhtTx = (GridDhtTxLocalAdapter)tx; @@ -262,6 +275,8 @@ public class GridDhtCacheEntry extends GridDistributedCacheEntry { tx.topologyVersion(), tx.threadId(), tx.xidVersion(), + serOrder, + serReadVer, timeout, /*reenter*/false, /*tx*/true, http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java index a67b1de..7108da6 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java @@ -17,16 +17,16 @@ package org.apache.ignite.internal.processors.cache.distributed.dht; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.Iterator; import java.util.LinkedHashMap; -import java.util.LinkedList; import java.util.Map; import java.util.UUID; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; import org.apache.ignite.internal.processors.cache.CacheObject; @@ -45,6 +45,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.GridClosureException; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiClosure; @@ -72,9 +73,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col /** */ private UUID reader; - /** Reload flag. */ - private boolean reload; - /** Read through flag. */ private boolean readThrough; @@ -120,7 +118,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col * @param reader Reader. * @param keys Keys. * @param readThrough Read through flag. - * @param reload Reload flag. * @param tx Transaction. * @param topVer Topology version. * @param subjId Subject ID. @@ -134,7 +131,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col UUID reader, LinkedHashMap<KeyCacheObject, Boolean> keys, boolean readThrough, - boolean reload, @Nullable IgniteTxLocalEx tx, @NotNull AffinityTopologyVersion topVer, @Nullable UUID subjId, @@ -152,7 +148,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col this.msgId = msgId; this.keys = keys; this.readThrough = readThrough; - this.reload = reload; this.tx = tx; this.topVer = topVer; this.subjId = subjId; @@ -291,8 +286,6 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col return new GridFinishedFuture<Collection<GridCacheEntryInfo>>( Collections.<GridCacheEntryInfo>emptyList()); - final Collection<GridCacheEntryInfo> infos = new LinkedList<>(); - String taskName0 = cctx.kernalContext().job().currentTaskName(); if (taskName0 == null) @@ -302,89 +295,77 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col GridCompoundFuture<Boolean, Boolean> txFut = null; - for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) { - while (true) { - GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer); + ClusterNode readerNode = cctx.discovery().node(reader); - try { - GridCacheEntryInfo info = e.info(); + if (readerNode != null && !readerNode.isLocal() && cctx.discovery().cacheNearNode(readerNode, cctx.name())) { + for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) { + while (true) { + GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer); - // If entry is obsolete. - if (info == null) - continue; + try { + if (e.obsolete()) + continue; - boolean addReader = (!e.deleted() && k.getValue() && !skipVals); + boolean addReader = (!e.deleted() && k.getValue() && !skipVals); - if (addReader) - e.unswap(false); + if (addReader) + e.unswap(false); - // Register reader. If there are active transactions for this entry, - // then will wait for their completion before proceeding. - // TODO: GG-4003: - // TODO: What if any transaction we wait for actually removes this entry? - // TODO: In this case seems like we will be stuck with untracked near entry. - // TODO: To fix, check that reader is contained in the list of readers once - // TODO: again after the returned future completes - if not, try again. - // TODO: Also, why is info read before transactions are complete, and not after? - IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null; + // Register reader. If there are active transactions for this entry, + // then will wait for their completion before proceeding. + // TODO: GG-4003: + // TODO: What if any transaction we wait for actually removes this entry? + // TODO: In this case seems like we will be stuck with untracked near entry. + // TODO: To fix, check that reader is contained in the list of readers once + // TODO: again after the returned future completes - if not, try again. + IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null; - if (f != null) { - if (txFut == null) - txFut = new GridCompoundFuture<>(CU.boolReducer()); - - txFut.add(f); - } + if (f != null) { + if (txFut == null) + txFut = new GridCompoundFuture<>(CU.boolReducer()); - infos.add(info); + txFut.add(f); + } - break; - } - catch (IgniteCheckedException err) { - return new GridFinishedFuture<>(err); - } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry when getting a DHT value: " + e); - } - finally { - cctx.evicts().touch(e, topVer); + break; + } + catch (IgniteCheckedException err) { + return new GridFinishedFuture<>(err); + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry when getting a DHT value: " + e); + } + finally { + cctx.evicts().touch(e, topVer); + } } } - } - if (txFut != null) - txFut.markInitialized(); + if (txFut != null) + txFut.markInitialized(); + } - IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> fut; + IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> fut; if (txFut == null || txFut.isDone()) { - if (reload && cctx.readThrough() && cctx.store().configured()) { - fut = cache().reloadAllAsync0(keys.keySet(), - true, - skipVals, + if (tx == null) { + fut = cache().getDhtAllAsync( + keys.keySet(), + readThrough, subjId, - taskName); + taskName, + expiryPlc, + skipVals, + /*can remap*/true); } else { - if (tx == null) { - fut = cache().getDhtAllAsync( - keys.keySet(), - readThrough, - subjId, - taskName, - expiryPlc, - skipVals, - /*can remap*/true); - } - else { - fut = tx.getAllAsync(cctx, - keys.keySet(), - null, - /*deserialize portable*/false, - skipVals, - /*keep cache objects*/true, - /*skip store*/!readThrough); - } + fut = tx.getAllAsync(cctx, + keys.keySet(), + /*deserialize portable*/false, + skipVals, + /*keep cache objects*/true, + /*skip store*/!readThrough); } } else { @@ -393,38 +374,28 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col // transactions to complete. fut = new GridEmbeddedFuture<>( txFut, - new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, CacheObject>>>() { - @Override public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> apply(Boolean b, Exception e) { + new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>>>() { + @Override public IgniteInternalFuture<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>> apply(Boolean b, Exception e) { if (e != null) throw new GridClosureException(e); - if (reload && cctx.readThrough() && cctx.store().configured()) { - return cache().reloadAllAsync0(keys.keySet(), - true, - skipVals, + if (tx == null) { + return cache().getDhtAllAsync( + keys.keySet(), + readThrough, subjId, - taskName); + taskName, + expiryPlc, + skipVals, + /*can remap*/true); } else { - if (tx == null) { - return cache().getDhtAllAsync( - keys.keySet(), - readThrough, - subjId, - taskName, - expiryPlc, - skipVals, - /*can remap*/true); - } - else { - return tx.getAllAsync(cctx, - keys.keySet(), - null, - /*deserialize portable*/false, - skipVals, - /*keep cache objects*/true, - /*skip store*/!readThrough); - } + return tx.getAllAsync(cctx, + keys.keySet(), + /*deserialize portable*/false, + skipVals, + /*keep cache objects*/true, + /*skip store*/!readThrough); } } } @@ -432,23 +403,29 @@ public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Col } return new GridEmbeddedFuture<>( - new C2<Map<KeyCacheObject, CacheObject>, Exception, Collection<GridCacheEntryInfo>>() { - @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, CacheObject> map, Exception e) { + new C2<Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>>, Exception, Collection<GridCacheEntryInfo>>() { + @Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, T2<CacheObject, GridCacheVersion>> map, Exception e) { if (e != null) { onDone(e); return Collections.emptyList(); } else { - for (Iterator<GridCacheEntryInfo> it = infos.iterator(); it.hasNext();) { - GridCacheEntryInfo info = it.next(); + Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size()); - Object v = map.get(info.key()); + for (Map.Entry<KeyCacheObject, T2<CacheObject, GridCacheVersion>> entry : map.entrySet()) { + T2<CacheObject, GridCacheVersion> val = entry.getValue(); - if (v == null) - it.remove(); - else - info.value(skipVals ? null : (CacheObject)v); + assert val != null; + + GridCacheEntryInfo info = new GridCacheEntryInfo(); + + info.cacheId(cctx.cacheId()); + info.key(entry.getKey()); + info.value(skipVals ? null : val.get1()); + info.version(val.get2()); + + infos.add(info); } return infos; http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java index 4f3e97d..c175b0b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtLockFuture.java @@ -380,9 +380,10 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> * @return Lock candidate. * @throws GridCacheEntryRemovedException If entry was removed. * @throws GridDistributedLockCancelledException If lock is canceled. + * @throws IgniteCheckedException If failed. */ @Nullable public GridCacheMvccCandidate addEntry(GridDhtCacheEntry entry) - throws GridCacheEntryRemovedException, GridDistributedLockCancelledException { + throws GridCacheEntryRemovedException, GridDistributedLockCancelledException, IgniteCheckedException { if (log.isDebugEnabled()) log.debug("Adding entry: " + entry); @@ -400,6 +401,8 @@ public final class GridDhtLockFuture extends GridCompoundIdentityFuture<Boolean> topVer, threadId, lockVer, + null, + null, timeout, /*reenter*/false, inTx(), http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java index c09a611..4ce4759 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTransactionalCacheAdapter.java @@ -672,9 +672,9 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (log.isDebugEnabled()) log.debug("Got removed entry when adding lock (will retry): " + entry); } - catch (GridDistributedLockCancelledException e) { + catch (IgniteCheckedException | GridDistributedLockCancelledException e) { if (log.isDebugEnabled()) - log.debug("Got lock request for cancelled lock (will fail): " + entry); + log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']'); return new GridDhtFinishedFuture<>(e); } @@ -1106,62 +1106,55 @@ public abstract class GridDhtTransactionalCacheAdapter<K, V> extends GridDhtCach if (tx == null || !tx.isRollbackOnly()) { GridCacheVersion dhtVer = req.dhtVersion(i); - try { - GridCacheVersion ver = e.version(); - - boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver); - - CacheObject val = null; - - if (ret) - val = e.innerGet(tx, - /*swap*/true, - /*read-through*/false, - /*fail-fast.*/false, - /*unmarshal*/false, - /*update-metrics*/true, - /*event notification*/req.returnValue(i), - /*temporary*/false, - CU.subjectId(tx, ctx.shared()), - null, - tx != null ? tx.resolveTaskName() : null, - null); - - assert e.lockedBy(mappedVer) || - (ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) : - "Entry does not own lock for tx [locNodeId=" + ctx.localNodeId() + - ", entry=" + e + - ", mappedVer=" + mappedVer + ", ver=" + ver + - ", tx=" + tx + ", req=" + req + - ", err=" + err + ']'; - - boolean filterPassed = false; - - if (tx != null && tx.onePhaseCommit()) { - IgniteTxEntry writeEntry = tx.entry(ctx.txKey(e.key())); - - assert writeEntry != null : - "Missing tx entry for locked cache entry: " + e; - - filterPassed = writeEntry.filtersPassed(); - } - - if (ret && val == null) - val = e.valueBytes(null); - - // We include values into response since they are required for local - // calls and won't be serialized. We are also including DHT version. - res.addValueBytes( - ret ? val : null, - filterPassed, - ver, - mappedVer); - } - catch (GridCacheFilterFailedException ex) { - assert false : "Filter should never fail if fail-fast is false."; + GridCacheVersion ver = e.version(); + + boolean ret = req.returnValue(i) || dhtVer == null || !dhtVer.equals(ver); + + CacheObject val = null; + + if (ret) + val = e.innerGet(tx, + /*swap*/true, + /*read-through*/false, + /*fail-fast.*/false, + /*unmarshal*/false, + /*update-metrics*/true, + /*event notification*/req.returnValue(i), + /*temporary*/false, + CU.subjectId(tx, ctx.shared()), + null, + tx != null ? tx.resolveTaskName() : null, + null); + + assert e.lockedBy(mappedVer) || + (ctx.mvcc().isRemoved(e.context(), mappedVer) && req.timeout() > 0) : + "Entry does not own lock for tx [locNodeId=" + ctx.localNodeId() + + ", entry=" + e + + ", mappedVer=" + mappedVer + ", ver=" + ver + + ", tx=" + tx + ", req=" + req + + ", err=" + err + ']'; + + boolean filterPassed = false; - ex.printStackTrace(); + if (tx != null && tx.onePhaseCommit()) { + IgniteTxEntry writeEntry = tx.entry(ctx.txKey(e.key())); + + assert writeEntry != null : + "Missing tx entry for locked cache entry: " + e; + + filterPassed = writeEntry.filtersPassed(); } + + if (ret && val == null) + val = e.valueBytes(null); + + // We include values into response since they are required for local + // calls and won't be serialized. We are also including DHT version. + res.addValueBytes( + ret ? val : null, + filterPassed, + ver, + mappedVer); } else { // We include values into response since they are required for local http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java index 4f8469f..44f34aa 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxLocal.java @@ -595,7 +595,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (finish(false) || state() == UNKNOWN) fut.finish(); else - fut.onError(new IgniteCheckedException("Failed to commit transaction: " + CU.txString(this))); + fut.onError(new IgniteCheckedException("Failed to rollback transaction: " + CU.txString(this))); } catch (IgniteTxOptimisticCheckedException e) { if (log.isDebugEnabled()) @@ -627,7 +627,7 @@ public class GridDhtTxLocal extends GridDhtTxLocalAdapter implements GridCacheMa if (finish(false) || state() == UNKNOWN) fut.finish(); else - fut.onError(new IgniteCheckedException("Failed to commit transaction: " + + fut.onError(new IgniteCheckedException("Failed to rollback transaction: " + CU.txString(GridDhtTxLocal.this))); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java index a15a334..d806801 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtTxPrepareFuture.java @@ -42,7 +42,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheMvccFuture; import org.apache.ignite.internal.processors.cache.GridCacheOperation; @@ -57,6 +56,7 @@ import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.dr.GridDrType; +import org.apache.ignite.internal.transactions.IgniteTxOptimisticCheckedException; import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridConcurrentHashSet; import org.apache.ignite.internal.util.GridLeanSet; @@ -429,9 +429,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter catch (GridCacheEntryRemovedException e) { assert false : "Got entry removed exception while holding transactional lock on entry: " + e; } - catch (GridCacheFilterFailedException e) { - assert false : "Got filter failed exception with fail fast false " + e; - } } } @@ -472,8 +469,18 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter if (log.isDebugEnabled()) log.debug("Marking all local candidates as ready: " + this); - Iterable<IgniteTxEntry> checkEntries = writes; + readyLocks(writes); + + if (tx.serializable() && tx.optimistic()) + readyLocks(reads); + locksReady = true; + } + + /** + * @param checkEntries Entries. + */ + private void readyLocks(Iterable<IgniteTxEntry> checkEntries) { for (IgniteTxEntry txEntry : checkEntries) { GridCacheContext cacheCtx = txEntry.context(); @@ -513,8 +520,6 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } } } - - locksReady = true; } /** @@ -813,12 +818,19 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter this.writes = writes; this.txNodes = txNodes; - if (!F.isEmpty(writes)) { + boolean ser = tx.serializable() && tx.optimistic(); + + if (!F.isEmpty(writes) || (ser && !F.isEmpty(reads))) { Map<Integer, Collection<KeyCacheObject>> forceKeys = null; for (IgniteTxEntry entry : writes) forceKeys = checkNeedRebalanceKeys(entry, forceKeys); + if (ser) { + for (IgniteTxEntry entry : reads) + forceKeys = checkNeedRebalanceKeys(entry, forceKeys); + } + forceKeysFut = forceRebalanceKeys(forceKeys); } @@ -847,7 +859,10 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter IgniteTxEntry e, Map<Integer, Collection<KeyCacheObject>> map ) { - if (retVal || !F.isEmpty(e.entryProcessors()) || !F.isEmpty(e.filters())) { + if (retVal || + !F.isEmpty(e.entryProcessors()) || + !F.isEmpty(e.filters()) || + e.serializableReadVersion() != null) { if (map == null) map = new HashMap<>(); @@ -906,10 +921,86 @@ public final class GridDhtTxPrepareFuture extends GridCompoundFuture<IgniteInter } /** + * @param entries Entries. + * @return Not null exception if version check failed. + * @throws IgniteCheckedException If failed. + */ + @Nullable private IgniteCheckedException checkReadConflict(Iterable<IgniteTxEntry> entries) + throws IgniteCheckedException { + try { + for (IgniteTxEntry entry : entries) { + GridCacheVersion serReadVer = entry.serializableReadVersion(); + + if (serReadVer != null) { + entry.cached().unswap(); + + if (!entry.cached().checkSerializableReadVersion(serReadVer)) + return versionCheckError(entry); + } + } + } + catch (GridCacheEntryRemovedException e) { + assert false : "Got removed exception on entry with dht local candidate: " + entries; + } + + return null; + } + + /** + * @param entry Entry. + * @return Optimistic version check error. + */ + private IgniteTxOptimisticCheckedException versionCheckError(IgniteTxEntry entry) { + GridCacheContext cctx = entry.context(); + + return new IgniteTxOptimisticCheckedException("Failed to prepare transaction, " + + "read/write conflict [key=" + entry.key().value(cctx.cacheObjectContext(), false) + + ", cache=" + cctx.name() + ']'); + } + + /** * */ private void prepare0() { try { + if (tx.serializable() && tx.optimistic()) { + IgniteCheckedException err0; + + try { + err0 = checkReadConflict(writes); + + if (err0 == null) + err0 = checkReadConflict(reads); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to check entry version: " + e, e); + + err0 = e; + } + + if (err0 != null) { + err.compareAndSet(null, err0); + + final GridNearTxPrepareResponse res = createPrepareResponse(); + + tx.rollbackAsync().listen(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { + @Override public void apply(IgniteInternalFuture<IgniteInternalTx> fut) { + if (GridDhtTxPrepareFuture.super.onDone(res, res.error())) { + try { + if (replied.compareAndSet(false, true)) + sendPrepareResponse(res); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send prepare response for transaction: " + tx, e); + } + } + } + }); + + return; + } + } + // We are holding transaction-level locks for entries here, so we can get next write version. onEntriesLocked(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java index a68e834..febe9ba 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java @@ -24,11 +24,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteLogger; -import org.apache.ignite.IgniteSystemProperties; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; @@ -39,8 +37,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; -import org.apache.ignite.internal.processors.cache.GridCacheFuture; import org.apache.ignite.internal.processors.cache.GridCacheMessage; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; @@ -49,7 +45,6 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetR import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.util.GridLeanMap; -import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; import org.apache.ignite.internal.util.tostring.GridToStringInclude; @@ -58,6 +53,7 @@ import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CIX1; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.P1; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -65,83 +61,30 @@ import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.Nullable; -import static org.apache.ignite.IgniteSystemProperties.IGNITE_NEAR_GET_MAX_REMAPS; - /** * Colocated get future. */ -public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<Map<K, V>> - implements GridCacheFuture<Map<K, V>> { +public class GridPartitionedGetFuture<K, V> extends CacheDistributedGetFutureAdapter<K, V> { /** */ private static final long serialVersionUID = 0L; - /** Default max remap count value. */ - public static final int DFLT_MAX_REMAP_CNT = 3; - /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); /** Logger. */ private static IgniteLogger log; - /** Maximum number of attempts to remap key to the same primary node. */ - private static final int MAX_REMAP_CNT = IgniteSystemProperties.getInteger(IGNITE_NEAR_GET_MAX_REMAPS, - DFLT_MAX_REMAP_CNT); - - /** Context. */ - private final GridCacheContext<K, V> cctx; - - /** Keys. */ - private Collection<KeyCacheObject> keys; - /** Topology version. */ private AffinityTopologyVersion topVer; - /** Reload flag. */ - private boolean reload; - - /** Read-through flag. */ - private boolean readThrough; - - /** Force primary flag. */ - private boolean forcePrimary; - - /** Future ID. */ - private IgniteUuid futId; - /** Version. */ private GridCacheVersion ver; - /** Trackable flag. */ - private volatile boolean trackable; - - /** Remap count. */ - private AtomicInteger remapCnt = new AtomicInteger(); - - /** Subject ID. */ - private UUID subjId; - - /** Task name. */ - private String taskName; - - /** Whether to deserialize portable objects. */ - private boolean deserializePortable; - - /** Expiry policy. */ - private IgniteCacheExpiryPolicy expiryPlc; - - /** Skip values flag. */ - private boolean skipVals; - - /** Flag indicating whether future can be remapped on a newer topology version. */ - private final boolean canRemap; - /** * @param cctx Context. * @param keys Keys. * @param topVer Topology version. * @param readThrough Read through flag. - * @param reload Reload flag. * @param forcePrimary If {@code true} then will force network trip to primary node even * if called on backup node. * @param subjId Subject ID. @@ -149,39 +92,39 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M * @param deserializePortable Deserialize portable flag. * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. + * @param canRemap Flag indicating whether future can be remapped on a newer topology version. + * @param needVer If {@code true} returns values as tuples containing value and version. + * @param keepCacheObjects Keep cache objects flag. */ public GridPartitionedGetFuture( GridCacheContext<K, V> cctx, Collection<KeyCacheObject> keys, AffinityTopologyVersion topVer, boolean readThrough, - boolean reload, boolean forcePrimary, @Nullable UUID subjId, String taskName, boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals, - boolean canRemap + boolean canRemap, + boolean needVer, + boolean keepCacheObjects ) { - super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size())); - - assert !F.isEmpty(keys); + super(cctx, + keys, + readThrough, + forcePrimary, + subjId, + taskName, + deserializePortable, + expiryPlc, + skipVals, + canRemap, + needVer, + keepCacheObjects); - this.cctx = cctx; - this.keys = keys; this.topVer = topVer; - this.readThrough = readThrough; - this.reload = reload; - this.forcePrimary = forcePrimary; - this.subjId = subjId; - this.deserializePortable = deserializePortable; - this.taskName = taskName; - this.expiryPlc = expiryPlc; - this.skipVals = skipVals; - this.canRemap = canRemap; - - futId = IgniteUuid.randomUuid(); ver = cctx.versions().next(); @@ -351,7 +294,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M -1, mappedKeys, readThrough, - reload, topVer, subjId, taskName == null ? 0 : taskName.hashCode(), @@ -404,7 +346,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M ver, mappedKeys, readThrough, - reload, topVer, subjId, taskName == null ? 0 : taskName.hashCode(), @@ -452,10 +393,10 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M boolean allowLocRead = !forcePrimary || cctx.affinity().primary(cctx.localNode(), key, topVer); while (true) { - GridCacheEntryEx entry = null; + GridCacheEntryEx entry; try { - if (!reload && allowLocRead) { + if (allowLocRead) { try { entry = colocated.context().isSwapOrOffheapEnabled() ? colocated.entryEx(key) : colocated.peekEx(key); @@ -464,18 +405,40 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M if (entry != null) { boolean isNew = entry.isNewLocked(); - CacheObject v = entry.innerGet(null, - /*swap*/true, - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiryPlc); + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + subjId, + null, + taskName, + expiryPlc); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = entry.innerGet(null, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc); + } colocated.context().evicts().touch(entry, topVer); @@ -485,7 +448,16 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M colocated.removeIfObsolete(key); } else { - cctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true); + if (needVer) + versionedResult(locVals, key, v, ver); + else + cctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObjects, + deserializePortable, + true); return false; } @@ -536,14 +508,6 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M catch (GridCacheEntryRemovedException ignored) { // No-op, will retry. } - catch (GridCacheFilterFailedException e) { - if (log.isDebugEnabled()) - log.debug("Filter validation failed for entry: " + e); - - colocated.context().evicts().touch(entry, topVer); - - break; - } } return remote; @@ -591,7 +555,16 @@ public class GridPartitionedGetFuture<K, V> extends GridCompoundIdentityFuture<M for (GridCacheEntryInfo info : infos) { assert skipVals == (info.value() == null); - cctx.addResult(map, info.key(), info.value(), skipVals, false, deserializePortable, false); + if (needVer) + versionedResult(map, info.key(), info.value(), info.version()); + else + cctx.addResult(map, + info.key(), + info.value(), + skipVals, + keepCacheObjects, + deserializePortable, + false); } return map; http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java index cba6872..4cd9e84 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java @@ -306,7 +306,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { @Nullable final Collection<? extends K> keys, final boolean forcePrimary, boolean skipTx, - @Nullable final GridCacheEntryEx entry, @Nullable UUID subjId, final String taskName, final boolean deserializePortable, @@ -334,7 +333,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() { @Override public IgniteInternalFuture<Map<K, V>> apply() { return getAllAsync0(ctx.cacheKeysView(keys), - false, forcePrimary, subjId0, taskName, @@ -920,7 +918,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * Entry point to all public API get methods. * * @param keys Keys to remove. - * @param reload Reload flag. * @param forcePrimary Force primary flag. * @param subjId Subject ID. * @param taskName Task name. @@ -931,7 +928,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { * @return Get future. */ private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<KeyCacheObject> keys, - boolean reload, boolean forcePrimary, UUID subjId, String taskName, @@ -947,7 +943,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); // Optimisation: try to resolve value locally and escape 'get future' creation. - if (!reload && !forcePrimary) { + if (!forcePrimary) { Map<K, V> locVals = U.newHashMap(keys.size()); boolean success = true; @@ -997,10 +993,6 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { catch (GridCacheEntryRemovedException ignored) { // No-op, retry. } - catch (GridCacheFilterFailedException ignored) { - // No-op, skip the key. - break; - } catch (GridDhtInvalidPartitionException ignored) { success = false; @@ -1036,14 +1028,15 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { keys, topVer, !skipStore, - reload, forcePrimary, subjId, taskName, deserializePortable, expiry, skipVals, - canRemap); + canRemap, + false, + false); fut.init(); @@ -1663,6 +1656,7 @@ public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { if (idx != null) { GridDhtCacheEntry entry = entries.get(idx); + try { GridCacheVersion ver = entry.version(); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 6d69198..f03b461 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -36,7 +36,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; import org.apache.ignite.internal.processors.cache.GridCacheContext; import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; -import org.apache.ignite.internal.processors.cache.GridCacheFilterFailedException; import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException; import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; @@ -68,6 +67,7 @@ import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.C2; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; @@ -188,7 +188,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Nullable final Collection<? extends K> keys, boolean forcePrimary, boolean skipTx, - @Nullable final GridCacheEntryEx entry, @Nullable UUID subjId, String taskName, final boolean deserializePortable, @@ -212,7 +211,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte @Override public IgniteInternalFuture<Map<K, V>> op(IgniteTxLocalAdapter tx) { return tx.getAllAsync(ctx, ctx.cacheKeysView(keys), - entry, deserializePortable, skipVals, false, @@ -230,7 +228,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte return loadAsync( ctx.cacheKeysView(keys), opCtx == null || !opCtx.skipStore(), - false, forcePrimary, topVer, subjId, @@ -257,7 +254,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte /** * @param keys Keys to load. * @param readThrough Read through flag. - * @param reload Reload flag. * @param forcePrimary Force get from primary node flag. * @param topVer Topology version. * @param subjId Subject ID. @@ -265,12 +261,12 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte * @param deserializePortable Deserialize portable flag. * @param expiryPlc Expiry policy. * @param skipVals Skip values flag. + * @param canRemap Can remap flag. * @return Loaded values. */ public IgniteInternalFuture<Map<K, V>> loadAsync( @Nullable Collection<KeyCacheObject> keys, boolean readThrough, - boolean reload, boolean forcePrimary, AffinityTopologyVersion topVer, @Nullable UUID subjId, @@ -278,7 +274,45 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte boolean deserializePortable, @Nullable IgniteCacheExpiryPolicy expiryPlc, boolean skipVals, - boolean canRemap + boolean canRemap) { + return loadAsync(keys, + readThrough, + forcePrimary, + topVer, subjId, + taskName, + deserializePortable, + expiryPlc, + skipVals, + canRemap, + false, + false); + } + + /** + * @param keys Keys to load. + * @param readThrough Read through flag. + * @param forcePrimary Force get from primary node flag. + * @param topVer Topology version. + * @param subjId Subject ID. + * @param taskName Task name. + * @param deserializePortable Deserialize portable flag. + * @param expiryPlc Expiry policy. + * @param skipVals Skip values flag. + * @return Loaded values. + */ + public IgniteInternalFuture<Map<K, V>> loadAsync( + @Nullable Collection<KeyCacheObject> keys, + boolean readThrough, + boolean forcePrimary, + AffinityTopologyVersion topVer, + @Nullable UUID subjId, + String taskName, + boolean deserializePortable, + @Nullable IgniteCacheExpiryPolicy expiryPlc, + boolean skipVals, + boolean canRemap, + boolean needVer, + boolean keepCacheObj ) { if (keys == null || keys.isEmpty()) return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); @@ -287,8 +321,8 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte expiryPlc = expiryPolicy(null); // Optimisation: try to resolve value locally and escape 'get future' creation. - if (!reload && !forcePrimary) { - Map<K, V> locVals = U.newHashMap(keys.size()); + if (!forcePrimary) { + Map<K, V> locVals = null; boolean success = true; @@ -304,18 +338,40 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (entry != null) { boolean isNew = entry.isNewLocked(); - CacheObject v = entry.innerGet(null, - /*swap*/true, - /*read-through*/false, - /*fail-fast*/true, - /*unmarshal*/true, - /**update-metrics*/false, - /*event*/!skipVals, - /*temporary*/false, - subjId, - null, - taskName, - expiryPlc); + CacheObject v = null; + GridCacheVersion ver = null; + + if (needVer) { + T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( + null, + /*swap*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + subjId, + null, + taskName, + expiryPlc); + + if (res != null) { + v = res.get1(); + ver = res.get2(); + } + } + else { + v = entry.innerGet(null, + /*swap*/true, + /*read-through*/false, + /*fail-fast*/true, + /*unmarshal*/true, + /**update-metrics*/false, + /*event*/!skipVals, + /*temporary*/false, + subjId, + null, + taskName, + expiryPlc); + } // Entry was not in memory or in swap, so we remove it from cache. if (v == null) { @@ -326,8 +382,22 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte success = false; } - else - ctx.addResult(locVals, key, v, skipVals, false, deserializePortable, true); + else { + if (locVals == null) + locVals = U.newHashMap(keys.size()); + + if (needVer) + locVals.put((K)key, (V)new T2<>((Object)(skipVals ? true : v), ver)); + else { + ctx.addResult(locVals, + key, + v, + skipVals, + keepCacheObj, + deserializePortable, + true); + } + } } else success = false; @@ -337,10 +407,6 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte catch (GridCacheEntryRemovedException ignored) { // No-op, retry. } - catch (GridCacheFilterFailedException ignored) { - // No-op, skip the key. - break; - } catch (GridDhtInvalidPartitionException ignored) { success = false; @@ -377,14 +443,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte keys, topVer, readThrough, - reload, forcePrimary, subjId, taskName, deserializePortable, expiryPlc, skipVals, - canRemap); + canRemap, + needVer, + keepCacheObj); fut.init(); @@ -803,10 +870,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (log.isDebugEnabled()) log.debug("Got removed entry when adding lock (will retry): " + entry); } - catch (GridDistributedLockCancelledException e) { + catch (IgniteCheckedException | GridDistributedLockCancelledException e) { if (log.isDebugEnabled()) - log.debug("Got lock request for cancelled lock (will ignore): " + - entry); + log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']'); fut.onError(e); http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java index 53c2b63..365b46b 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedLockFuture.java @@ -323,7 +323,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture inTx(), inTx() && tx.implicitSingle(), false, - false); + false, + null); cand.topologyVersion(topVer.get()); } @@ -342,7 +343,8 @@ public final class GridDhtColocatedLockFuture extends GridCompoundIdentityFuture inTx(), inTx() && tx.implicitSingle(), false, - false); + false, + null); cand.topologyVersion(topVer.get()); } http://git-wip-us.apache.org/repos/asf/ignite/blob/9d7543a8/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java index 82054d9..1bf03a9 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java @@ -385,7 +385,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { @Nullable Collection<? extends K> keys, boolean forcePrimary, boolean skipTx, - @Nullable GridCacheEntryEx entry, @Nullable UUID subjId, String taskName, boolean deserializePortable, @@ -406,7 +405,6 @@ public class GridNearAtomicCache<K, V> extends GridNearCacheAdapter<K, V> { return loadAsync(null, ctx.cacheKeysView(keys), - false, forcePrimary, subjId, taskName,
