http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java index 2c7bf8a..758f82c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalAdapter.java @@ -66,9 +66,7 @@ import org.apache.ignite.internal.processors.dr.GridDrType; import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; -import org.apache.ignite.internal.util.F0; import org.apache.ignite.internal.util.GridLeanMap; -import org.apache.ignite.internal.util.GridLeanSet; import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.future.GridFutureAdapter; @@ -123,18 +121,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** */ private static final long serialVersionUID = 0L; - /** Per-transaction read map. */ - @GridToStringInclude - protected Map<IgniteTxKey, IgniteTxEntry> txMap; - - /** Read view on transaction map. */ - @GridToStringExclude - protected IgniteTxMap readView; - - /** Write view on transaction map. */ - @GridToStringExclude - protected IgniteTxMap writeView; - /** Minimal version encountered (either explicit lock or XID of this transaction). */ protected GridCacheVersion minVer; @@ -156,9 +142,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** Commit error. */ protected AtomicReference<Throwable> commitErr = new AtomicReference<>(); - /** Active cache IDs. */ - protected Set<Integer> activeCacheIds = new HashSet<>(); - /** Need return value. */ protected boolean needRetVal; @@ -168,6 +151,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** Flag indicating whether deployment is enabled for caches from this transaction or not. */ private boolean depEnabled; + /** */ + @GridToStringInclude + protected IgniteTxLocalState txState; + /** * Empty constructor required for {@link Externalizable}. */ @@ -209,7 +196,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cctx, xidVer, implicit, - implicitSingle, /*local*/true, sys, plc, @@ -225,6 +211,13 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter ); minVer = xidVer; + + txState = implicitSingle ? new IgniteTxImplicitSingleStateImpl() : new IgniteTxStateImpl(); + } + + /** {@inheritDoc} */ + @Override public IgniteTxState txState() { + return txState; } /** @@ -246,7 +239,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public boolean empty() { - return txMap.isEmpty(); + return txState.empty(); } /** {@inheritDoc} */ @@ -267,16 +260,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public boolean onOwnerChanged(GridCacheEntryEx entry, GridCacheMvccCandidate owner) { assert false; - return false; - } - /** - * Gets collection of active cache IDs for this transaction. - * - * @return Collection of active cache IDs. - */ - @Override public Collection<Integer> activeCacheIds() { - return activeCacheIds; + return false; } /** {@inheritDoc} */ @@ -284,77 +269,70 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return depEnabled; } + /** + * @param depEnabled Flag indicating whether deployment is enabled for caches from this transaction or not. + */ + public void activeCachesDeploymentEnabled(boolean depEnabled) { + this.depEnabled = depEnabled; + } + /** {@inheritDoc} */ @Override public boolean isStarted() { - return txMap != null; + return txState.initialized(); } /** {@inheritDoc} */ @Override public boolean hasWriteKey(IgniteTxKey key) { - return writeView.containsKey(key); + return txState.hasWriteKey(key); } /** * @return Transaction read set. */ @Override public Set<IgniteTxKey> readSet() { - return txMap == null ? Collections.<IgniteTxKey>emptySet() : readView.keySet(); + return txState.readSet(); } /** * @return Transaction write set. */ @Override public Set<IgniteTxKey> writeSet() { - return txMap == null ? Collections.<IgniteTxKey>emptySet() : writeView.keySet(); - } - - /** {@inheritDoc} */ - @Override public boolean removed(IgniteTxKey key) { - if (txMap == null) - return false; - - IgniteTxEntry e = txMap.get(key); - - return e != null && e.op() == DELETE; + return txState.writeSet(); } /** {@inheritDoc} */ @Override public Map<IgniteTxKey, IgniteTxEntry> readMap() { - return readView == null ? Collections.<IgniteTxKey, IgniteTxEntry>emptyMap() : readView; + return txState.readMap(); } /** {@inheritDoc} */ @Override public Map<IgniteTxKey, IgniteTxEntry> writeMap() { - return writeView == null ? Collections.<IgniteTxKey, IgniteTxEntry>emptyMap() : writeView; + return txState.writeMap(); } /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> allEntries() { - return txMap == null ? Collections.<IgniteTxEntry>emptySet() : txMap.values(); + return txState.allEntries(); } /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> readEntries() { - return readView == null ? Collections.<IgniteTxEntry>emptyList() : readView.values(); + return txState.readEntries(); } /** {@inheritDoc} */ @Override public Collection<IgniteTxEntry> writeEntries() { - return writeView == null ? Collections.<IgniteTxEntry>emptyList() : writeView.values(); + return txState.writeEntries(); } /** {@inheritDoc} */ @Nullable @Override public IgniteTxEntry entry(IgniteTxKey key) { - return txMap == null ? null : txMap.get(key); + return txState.entry(key); } /** {@inheritDoc} */ @Override public void seal() { - if (readView != null) - readView.seal(); - - if (writeView != null) - writeView.seal(); + txState.seal(); } /** {@inheritDoc} */ @@ -409,7 +387,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter KeyCacheObject key, CacheEntryPredicate[] filter ) throws GridCacheFilterFailedException { - IgniteTxEntry e = txMap == null ? null : txMap.get(cacheCtx.txKey(key)); + IgniteTxEntry e = entry(cacheCtx.txKey(key)); if (e != null) { // We should look at tx entry previous value. If this is a user peek then previous @@ -652,7 +630,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (!storeEnabled() || internal()) return; - Collection<CacheStoreManager> stores = stores(); + Collection<CacheStoreManager> stores = txState.stores(cctx); if (stores == null || stores.isEmpty()) return; @@ -854,7 +832,9 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter checkValid(); - boolean empty = F.isEmpty(near() ? txMap : writeMap()); + Collection<IgniteTxEntry> commitEntries = near() ? allEntries() : writeEntries(); + + boolean empty = F.isEmpty(commitEntries); // Register this transaction as completed prior to write-phase to // ensure proper lock ordering for removed entries. @@ -874,7 +854,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /* * Commit to cache. Note that for 'near' transaction we loop through all the entries. */ - for (IgniteTxEntry txEntry : (near() ? allEntries() : writeEntries())) { + for (IgniteTxEntry txEntry : commitEntries) { GridCacheContext cacheCtx = txEntry.context(); GridDrType drType = cacheCtx.isDrEnabled() ? DR_PRIMARY : DR_NONE; @@ -1282,7 +1262,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter cctx.tm().rollbackTx(this); if (!internal()) { - Collection<CacheStoreManager> stores = stores(); + Collection<CacheStoreManager> stores = txState.stores(cctx); if (stores != null && !stores.isEmpty()) { assert isWriteToStoreFromDhtValid(stores) : @@ -1582,25 +1562,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** - * Adds skipped key. - * - * @param skipped Skipped set (possibly {@code null}). - * @param key Key to add. - * @return Skipped set. - */ - private Set<KeyCacheObject> skip(Set<KeyCacheObject> skipped, KeyCacheObject key) { - if (skipped == null) - skipped = new GridLeanSet<>(); - - skipped.add(key); - - if (log.isDebugEnabled()) - log.debug("Added key to skipped set: " + key); - - return skipped; - } - - /** * Loads all missed keys for * {@link #getAllAsync(GridCacheContext, Collection, boolean, boolean, boolean, boolean)} method. * @@ -1954,6 +1915,24 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** {@inheritDoc} */ + @Override public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync( + GridCacheContext cacheCtx, + K key, + V val, + boolean retval, + CacheEntryPredicate[] filter) { + return putAsync0(cacheCtx, key, val, null, null, retval, filter); + } + + /** {@inheritDoc} */ + @Override public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync(GridCacheContext cacheCtx, + K key, + EntryProcessor<K, V, Object> entryProcessor, + Object... invokeArgs) { + return (IgniteInternalFuture)putAsync0(cacheCtx, key, null, entryProcessor, invokeArgs, true, null); + } + + /** {@inheritDoc} */ @Override public IgniteInternalFuture<?> putAllDrAsync( GridCacheContext cacheCtx, Map<KeyCacheObject, GridCacheDrInfo> drMap @@ -2009,12 +1988,88 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param cacheCtx Cache context. + * @param cacheKey Key to enlist. + * @param val Value. + * @param expiryPlc Explicitly specified expiry policy for entry. + * @param entryProcessor Entry processor (for invoke operation). + * @param invokeArgs Optional arguments for EntryProcessor. + * @param retval Flag indicating whether a value should be returned. + * @param lockOnly If {@code true}, then entry will be enlisted as noop. + * @param filter User filters. + * @param ret Return value. + * @param skipStore Skip store flag. + * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. + * @return Future for entry values loading. + */ + private <K, V> IgniteInternalFuture<Void> enlistWrite( + final GridCacheContext cacheCtx, + KeyCacheObject cacheKey, + Object val, + @Nullable ExpiryPolicy expiryPlc, + @Nullable EntryProcessor<K, V, Object> entryProcessor, + @Nullable Object[] invokeArgs, + final boolean retval, + boolean lockOnly, + final CacheEntryPredicate[] filter, + final GridCacheReturn ret, + boolean skipStore, + final boolean singleRmv) { + try { + addActiveCache(cacheCtx); + + final boolean hasFilters = !F.isEmptyOrNulls(filter) && !F.isAlwaysTrue(filter); + final boolean needVal = singleRmv || retval || hasFilters; + final boolean needReadVer = needVal && (serializable() && optimistic()); + + if (entryProcessor != null) + transform = true; + + boolean loadMissed = enlistWriteEntry(cacheCtx, + cacheKey, + val, + entryProcessor, + invokeArgs, + expiryPlc, + retval, + lockOnly, + filter, + /*drVer*/null, + /*drTtl*/-1L, + /*drExpireTime*/-1L, + ret, + /*enlisted*/null, + skipStore, + singleRmv, + hasFilters, + needVal, + needReadVer); + + if (loadMissed) { + return loadMissing(cacheCtx, + Collections.singleton(cacheKey), + filter, + ret, + needReadVer, + singleRmv, + hasFilters, + skipStore, + retval); + } + + return new GridFinishedFuture<>(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + } + + /** * Internal routine for <tt>putAll(..)</tt> * * @param cacheCtx Cache context. * @param keys Keys to enlist. * @param expiryPlc Explicitly specified expiry policy for entry. - * @param implicit Implicit flag. * @param lookup Value lookup map ({@code null} for remove). * @param invokeMap Map with entry processors for invoke operation. * @param invokeArgs Optional arguments for EntryProcessor. @@ -2027,13 +2082,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @param drRmvMap DR remove map (optional). * @param skipStore Skip store flag. * @param singleRmv {@code True} for single key remove operation ({@link Cache#remove(Object)}. - * @return Future with skipped keys (the ones that didn't pass filter for pessimistic transactions). + * @return Future for missing values loading. */ - private <K, V> IgniteInternalFuture<Set<KeyCacheObject>> enlistWrite( + private <K, V> IgniteInternalFuture<Void> enlistWrite( final GridCacheContext cacheCtx, Collection<?> keys, @Nullable ExpiryPolicy expiryPlc, - boolean implicit, @Nullable Map<?, ?> lookup, @Nullable Map<?, EntryProcessor<K, V, Object>> invokeMap, @Nullable Object[] invokeArgs, @@ -2056,8 +2110,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter return new GridFinishedFuture<>(e); } - Set<KeyCacheObject> skipped = null; - boolean rmv = lookup == null && invokeMap == null; Set<KeyCacheObject> missedForLoad = null; @@ -2115,345 +2167,441 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); - IgniteTxKey txKey = cacheCtx.txKey(cacheKey); - - IgniteTxEntry txEntry = entry(txKey); - - // First time access. - if (txEntry == null) { - while (true) { - GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion()); - - try { - entry.unswap(false); - - // Check if lock is being explicitly acquired by the same thread. - if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() && - entry.lockedByThread(threadId, xidVer)) - throw new IgniteCheckedException("Cannot access key within transaction if lock is " + - "externally held [key=" + key + ", entry=" + entry + ", xidVer=" + xidVer + - ", threadId=" + threadId + - ", locNodeId=" + cctx.localNodeId() + ']'); - - CacheObject old = null; - GridCacheVersion readVer = null; + boolean loadMissed = enlistWriteEntry(cacheCtx, + cacheKey, + val, + entryProcessor, + invokeArgs, + expiryPlc, + retval, + lockOnly, + filter, + drVer, + drTtl, + drExpireTime, + ret, + enlisted, + skipStore, + singleRmv, + hasFilters, + needVal, + needReadVer); + + if (loadMissed) { + if (missedForLoad == null) + missedForLoad = new HashSet<>(); + + missedForLoad.add(cacheKey); + } + } - if (optimistic() && !implicit()) { - try { - if (needReadVer) { - T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ? - entry.innerGetVersioned(this, - /*swap*/false, - /*unmarshal*/retval, - /*metrics*/retval, - /*events*/retval, - CU.subjectId(this, cctx), - entryProcessor, - resolveTaskName(), - null) : null; + if (missedForLoad != null) { + return loadMissing(cacheCtx, + missedForLoad, + filter, + ret, + needReadVer, + singleRmv, + hasFilters, + skipStore, + retval); + } - if (res != null) { - old = res.get1(); - readVer = res.get2(); - } - } - else { - old = entry.innerGet(this, - /*swap*/false, - /*read-through*/false, - /*fail-fast*/false, - /*unmarshal*/retval, - /*metrics*/retval, - /*events*/retval, - /*temporary*/false, - CU.subjectId(this, cctx), - entryProcessor, - resolveTaskName(), - null); - } - } - catch (ClusterTopologyCheckedException e) { - entry.context().evicts().touch(entry, topologyVersion()); + return new GridFinishedFuture<>(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } + } - throw e; - } - } - else - old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); + /** + * @param cacheCtx Cache context. + * @param keys Keys to load. + * @param ret Return value. + * @param needReadVer Read version flag. + * @param singleRmv {@code True} for single remove operation. + * @param hasFilters {@code True} if filters not empty. + * @param skipStore Skip store flag. + * @param retval Return value flag. + * @return Load future. + */ + private IgniteInternalFuture<Void> loadMissing( + final GridCacheContext cacheCtx, + final Set<KeyCacheObject> keys, + final CacheEntryPredicate[] filter, + final GridCacheReturn ret, + final boolean needReadVer, + final boolean singleRmv, + final boolean hasFilters, + final boolean skipStore, + final boolean retval) { + GridInClosure3<KeyCacheObject, Object, GridCacheVersion> c = + new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { + @Override public void apply(KeyCacheObject key, + @Nullable Object val, + @Nullable GridCacheVersion loadVer) { + if (log.isDebugEnabled()) + log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); - if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { - skipped = skip(skipped, cacheKey); + IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId())); - ret.set(cacheCtx, old, false); + assert e != null; - if (!readCommitted()) { - // Enlist failed filters as reads for non-read-committed mode, - // so future ops will get the same values. - txEntry = addEntry(READ, - old, - null, - null, - entry, - null, - CU.empty0(), - false, - -1L, - -1L, - null, - skipStore); + if (needReadVer) { + assert loadVer != null; - txEntry.markValid(); + e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); + } - if (needReadVer) { - assert readVer != null; + if (singleRmv) { + assert !hasFilters && !retval; + assert val == null || Boolean.TRUE.equals(val) : val; - txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); - } - } + ret.set(cacheCtx, null, val != null); + } + else { + CacheObject cacheVal = cacheCtx.toCacheObject(val); - if (readCommitted()) - cacheCtx.evicts().touch(entry, topologyVersion()); + if (e.op() == TRANSFORM) { + GridCacheVersion ver; - break; // While. + try { + ver = e.cached().version(); } + catch (GridCacheEntryRemovedException ex) { + assert optimistic() : e; - final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : - entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); - txEntry = addEntry(op, - cacheCtx.toCacheObject(val), - entryProcessor, - invokeArgs, - entry, - expiryPlc, - filter, - true, - drTtl, - drExpireTime, - drVer, - skipStore); + ver = null; + } - if (!implicit() && readCommitted() && !cacheCtx.offheapTiered()) - cacheCtx.evicts().touch(entry, topologyVersion()); + addInvokeResult(e, cacheVal, ret, ver); + } + else { + boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter); - enlisted.add(cacheKey); + ret.set(cacheCtx, cacheVal, success); + } + } + } + }; - if (!pessimistic() && !implicit()) { - txEntry.markValid(); + return loadMissing( + cacheCtx, + /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore, + /*async*/true, + keys, + /*skipVals*/singleRmv, + needReadVer, + c); + } - if (old == null) { - if (needVal) { - if (missedForLoad == null) - missedForLoad = new HashSet<>(); + /** + * @param cacheCtx Cache context. + * @param cacheKey Key. + * @param val Value. + * @param entryProcessor Entry processor. + * @param invokeArgs Optional arguments for EntryProcessor. + * @param expiryPlc Explicitly specified expiry policy for entry. + * @param retval Return value flag. + * @param lockOnly + * @param filter Filter. + * @param drVer DR version. + * @param drTtl DR ttl. + * @param drExpireTime DR expire time. + * @param ret Return value. + * @param enlisted Enlisted keys collection. + * @param skipStore Skip store flag. + * @param singleRmv {@code True} for single remove operation. + * @param hasFilters {@code True} if filters not empty. + * @param needVal {@code True} if value is needed. + * @param needReadVer {@code True} if need read entry version. + * @return {@code True} if entry value should be loaded. + * @throws IgniteCheckedException If failed. + */ + private boolean enlistWriteEntry(GridCacheContext cacheCtx, + final KeyCacheObject cacheKey, + final @Nullable Object val, + final @Nullable EntryProcessor<?, ?, ?> entryProcessor, + final @Nullable Object[] invokeArgs, + final @Nullable ExpiryPolicy expiryPlc, + final boolean retval, + final boolean lockOnly, + final CacheEntryPredicate[] filter, + final GridCacheVersion drVer, + final long drTtl, + long drExpireTime, + final GridCacheReturn ret, + @Nullable final Collection<KeyCacheObject> enlisted, + boolean skipStore, + boolean singleRmv, + boolean hasFilters, + final boolean needVal, + boolean needReadVer + ) throws IgniteCheckedException { + boolean loadMissed = false; - missedForLoad.add(cacheKey); - } - else { - assert !implicit() || !transform : this; - assert txEntry.op() != TRANSFORM : txEntry; + final boolean rmv = val == null && entryProcessor == null; - if (retval) - ret.set(cacheCtx, null, true); - else - ret.success(true); - } - } - else { - if (needReadVer) { - assert readVer != null; + IgniteTxKey txKey = cacheCtx.txKey(cacheKey); - txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); - } + IgniteTxEntry txEntry = entry(txKey); - if (retval && !transform) - ret.set(cacheCtx, old, true); - else { - if (txEntry.op() == TRANSFORM) { - GridCacheVersion ver; + // First time access. + if (txEntry == null) { + while (true) { + GridCacheEntryEx entry = entryEx(cacheCtx, txKey, topologyVersion()); - try { - ver = entry.version(); - } - catch (GridCacheEntryRemovedException ex) { - assert optimistic() : txEntry; + try { + entry.unswap(false); + + // Check if lock is being explicitly acquired by the same thread. + if (!implicit && cctx.kernalContext().config().isCacheSanityCheckEnabled() && + entry.lockedByThread(threadId, xidVer)) { + throw new IgniteCheckedException("Cannot access key within transaction if lock is " + + "externally held [key=" + CU.value(cacheKey, cacheCtx, false) + + ", entry=" + entry + + ", xidVer=" + xidVer + + ", threadId=" + threadId + + ", locNodeId=" + cctx.localNodeId() + ']'); + } - if (log.isDebugEnabled()) - log.debug("Failed to get entry version " + - "[err=" + ex.getMessage() + ']'); + CacheObject old = null; + GridCacheVersion readVer = null; - ver = null; - } + if (optimistic() && !implicit()) { + try { + if (needReadVer) { + T2<CacheObject, GridCacheVersion> res = primaryLocal(entry) ? + entry.innerGetVersioned(this, + /*swap*/false, + /*unmarshal*/retval, + /*metrics*/retval, + /*events*/retval, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null) : null; - addInvokeResult(txEntry, old, ret, ver); - } - else - ret.success(true); - } + if (res != null) { + old = res.get1(); + readVer = res.get2(); } } - // Pessimistic. else { - if (retval && !transform) - ret.set(cacheCtx, old, true); - else - ret.success(true); + old = entry.innerGet(this, + /*swap*/false, + /*read-through*/false, + /*fail-fast*/false, + /*unmarshal*/retval, + /*metrics*/retval, + /*events*/retval, + /*temporary*/false, + CU.subjectId(this, cctx), + entryProcessor, + resolveTaskName(), + null); } - - break; // While. } - catch (GridCacheEntryRemovedException ignore) { - if (log.isDebugEnabled()) - log.debug("Got removed entry in transaction putAll0 method: " + entry); + catch (ClusterTopologyCheckedException e) { + entry.context().evicts().touch(entry, topologyVersion()); + + throw e; } } - } - else { - if (entryProcessor == null && txEntry.op() == TRANSFORM) - throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + - "transaction after EntryProcessor is applied): " + key); + else + old = retval ? entry.rawGetOrUnmarshal(false) : entry.rawGet(); - GridCacheEntryEx entry = txEntry.cached(); + if (old != null && hasFilters && !filter(entry.context(), cacheKey, old, filter)) { + ret.set(cacheCtx, old, false); - CacheObject v = txEntry.value(); - - boolean del = txEntry.op() == DELETE && rmv; + if (!readCommitted()) { + // Enlist failed filters as reads for non-read-committed mode, + // so future ops will get the same values. + txEntry = addEntry(READ, + old, + null, + null, + entry, + null, + CU.empty0(), + false, + -1L, + -1L, + null, + skipStore); - if (!del) { - if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) { - skipped = skip(skipped, cacheKey); + txEntry.markValid(); - ret.set(cacheCtx, v, false); + if (needReadVer) { + assert readVer != null; - continue; + txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); + } } - GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM : - v != null ? UPDATE : CREATE; + if (readCommitted()) + cacheCtx.evicts().touch(entry, topologyVersion()); - txEntry = addEntry(op, - cacheCtx.toCacheObject(val), - entryProcessor, - invokeArgs, - entry, - expiryPlc, - filter, - true, - drTtl, - drExpireTime, - drVer, - skipStore); + break; // While. + } + + final GridCacheOperation op = lockOnly ? NOOP : rmv ? DELETE : + entryProcessor != null ? TRANSFORM : old != null ? UPDATE : CREATE; + + txEntry = addEntry(op, + cacheCtx.toCacheObject(val), + entryProcessor, + invokeArgs, + entry, + expiryPlc, + filter, + true, + drTtl, + drExpireTime, + drVer, + skipStore); + if (!implicit() && readCommitted() && !cacheCtx.offheapTiered()) + cacheCtx.evicts().touch(entry, topologyVersion()); + + if (enlisted != null) enlisted.add(cacheKey); - if (txEntry.op() == TRANSFORM) { - GridCacheVersion ver; + if (!pessimistic() && !implicit()) { + txEntry.markValid(); - try { - ver = entry.version(); - } - catch (GridCacheEntryRemovedException e) { - assert optimistic() : txEntry; + if (old == null) { + if (needVal) + loadMissed = true; + else { + assert !implicit() || !transform : this; + assert txEntry.op() != TRANSFORM : txEntry; - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); + if (retval) + ret.set(cacheCtx, null, true); + else + ret.success(true); + } + } + else { + if (needReadVer) { + assert readVer != null; - ver = null; + txEntry.serializableReadVersion(singleRmv ? SER_READ_NOT_EMPTY_VER : readVer); } - addInvokeResult(txEntry, txEntry.value(), ret, ver); - } - } + if (retval && !transform) + ret.set(cacheCtx, old, true); + else { + if (txEntry.op() == TRANSFORM) { + GridCacheVersion ver; - if (!pessimistic()) { - txEntry.markValid(); + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException ex) { + assert optimistic() : txEntry; + + if (log.isDebugEnabled()) + log.debug("Failed to get entry version " + + "[err=" + ex.getMessage() + ']'); + + ver = null; + } + addInvokeResult(txEntry, old, ret, ver); + } + else + ret.success(true); + } + } + } + // Pessimistic. + else { if (retval && !transform) - ret.set(cacheCtx, v, true); + ret.set(cacheCtx, old, true); else ret.success(true); } - } + + break; // While. + } + catch (GridCacheEntryRemovedException ignore) { + if (log.isDebugEnabled()) + log.debug("Got removed entry in transaction putAll0 method: " + entry); + } } } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - - if (missedForLoad != null) { - final boolean skipVals = singleRmv; - - IgniteInternalFuture<Void> fut = loadMissing( - cacheCtx, - /*read through*/cacheCtx.config().isLoadPreviousValue() && !skipStore, - /*async*/true, - missedForLoad, - skipVals, - needReadVer, - new GridInClosure3<KeyCacheObject, Object, GridCacheVersion>() { - @Override public void apply(KeyCacheObject key, - @Nullable Object val, - @Nullable GridCacheVersion loadVer) { - if (log.isDebugEnabled()) - log.debug("Loaded value from remote node [key=" + key + ", val=" + val + ']'); + else { + if (entryProcessor == null && txEntry.op() == TRANSFORM) + throw new IgniteCheckedException("Failed to enlist write value for key (cannot have update value in " + + "transaction after EntryProcessor is applied): " + CU.value(cacheKey, cacheCtx, false)); - IgniteTxEntry e = entry(new IgniteTxKey(key, cacheCtx.cacheId())); + GridCacheEntryEx entry = txEntry.cached(); - assert e != null; + CacheObject v = txEntry.value(); - if (needReadVer) { - assert loadVer != null; + boolean del = txEntry.op() == DELETE && rmv; - e.serializableReadVersion(singleRmv && val != null ? SER_READ_NOT_EMPTY_VER : loadVer); - } + if (!del) { + if (hasFilters && !filter(entry.context(), cacheKey, v, filter)) { + ret.set(cacheCtx, v, false); - if (singleRmv) { - assert !hasFilters && !retval; - assert val == null || Boolean.TRUE.equals(val) : val; + return loadMissed; + } - ret.set(cacheCtx, null, val != null); - } - else { - CacheObject cacheVal = cacheCtx.toCacheObject(val); + GridCacheOperation op = rmv ? DELETE : entryProcessor != null ? TRANSFORM : + v != null ? UPDATE : CREATE; - if (e.op() == TRANSFORM) { - GridCacheVersion ver; + txEntry = addEntry(op, + cacheCtx.toCacheObject(val), + entryProcessor, + invokeArgs, + entry, + expiryPlc, + filter, + true, + drTtl, + drExpireTime, + drVer, + skipStore); - try { - ver = e.cached().version(); - } - catch (GridCacheEntryRemovedException ex) { - assert optimistic() : e; + if (enlisted != null) + enlisted.add(cacheKey); - if (log.isDebugEnabled()) - log.debug("Failed to get entry version: [msg=" + ex.getMessage() + ']'); + if (txEntry.op() == TRANSFORM) { + GridCacheVersion ver; - ver = null; - } + try { + ver = entry.version(); + } + catch (GridCacheEntryRemovedException e) { + assert optimistic() : txEntry; - addInvokeResult(e, cacheVal, ret, ver); - } - else { - boolean success = !hasFilters || isAll(e.context(), key, cacheVal, filter); + if (log.isDebugEnabled()) + log.debug("Failed to get entry version: [msg=" + e.getMessage() + ']'); - ret.set(cacheCtx, cacheVal, success); - } - } + ver = null; } - }); - return new GridEmbeddedFuture<>( - new C2<Void, Exception, Set<KeyCacheObject>>() { - @Override public Set<KeyCacheObject> apply(Void b, Exception e) { - if (e != null) - throw new GridClosureException(e); + addInvokeResult(txEntry, txEntry.value(), ret, ver); + } + } - return Collections.emptySet(); - } - }, fut - ); + if (!pessimistic()) { + txEntry.markValid(); + + if (retval && !transform) + ret.set(cacheCtx, v, true); + else + ret.success(true); + } } - return new GridFinishedFuture<>(skipped); + return loadMissed; } /** @@ -2486,22 +2634,19 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * * @param cacheCtx Context. * @param keys Keys. - * @param failed Collection of potentially failed keys (need to populate in this method). * @param ret Return value. * @param rmv {@code True} if remove. * @param retval Flag to return value or not. * @param read {@code True} if read. * @param accessTtl TTL for read operation. * @param filter Filter to check entries. - * @return Failed keys. * @throws IgniteCheckedException If error. * @param computeInvoke If {@code true} computes return value for invoke operation. */ @SuppressWarnings("unchecked") - protected Set<KeyCacheObject> postLockWrite( + protected final void postLockWrite( GridCacheContext cacheCtx, Iterable<KeyCacheObject> keys, - Set<KeyCacheObject> failed, GridCacheReturn ret, boolean rmv, boolean retval, @@ -2606,8 +2751,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter log.debug("Filter passed in post lock for key: " + k); } else { - failed = skip(failed, k); - // Revert operation to previous. (if no - NOOP, so entry will be unlocked). txEntry.setAndMarkValid(txEntry.previousOperation(), cacheCtx.toCacheObject(ret.value())); txEntry.filters(CU.empty0()); @@ -2638,11 +2781,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } } - - if (log.isDebugEnabled()) - log.debug("Entries that failed after lock filter check: " + failed); - - return failed; } /** @@ -2696,6 +2834,144 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } /** + * @param cacheCtx Cache context. + * @param retval Return value flag. + * @throws IgniteCheckedException If failed. + */ + private void beforePut(GridCacheContext cacheCtx, boolean retval) throws IgniteCheckedException { + checkUpdatesAllowed(cacheCtx); + + cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT); + + if (retval) + needReturnValue(true); + + checkValid(); + + init(); + } + + /** + * Internal method for single update operation. + * + * @param cacheCtx Cache context. + * @param key Key. + * @param val Value. + * @param entryProcessor Entry processor. + * @param invokeArgs Optional arguments for EntryProcessor. + * @param retval Return value flag. + * @param filter Filter. + * @return Operation future. + */ + private <K, V> IgniteInternalFuture putAsync0( + final GridCacheContext cacheCtx, + K key, + @Nullable V val, + @Nullable EntryProcessor entryProcessor, + @Nullable final Object[] invokeArgs, + final boolean retval, + @Nullable final CacheEntryPredicate[] filter + ) { + assert key != null; + + try { + beforePut(cacheCtx, retval); + + final GridCacheReturn ret = new GridCacheReturn(localResult(), false); + + CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + + KeyCacheObject cacheKey = cacheCtx.toCacheKeyObject(key); + + final IgniteInternalFuture<Void> loadFut = enlistWrite( + cacheCtx, + cacheKey, + val, + opCtx != null ? opCtx.expiry() : null, + entryProcessor, + invokeArgs, + retval, + /*lockOnly*/false, + filter, + ret, + opCtx != null && opCtx.skipStore(), + /*singleRmv*/false); + + if (pessimistic()) { + assert loadFut == null || loadFut.isDone() : loadFut; + + final Collection<KeyCacheObject> enlisted = Collections.singleton(cacheKey); + + if (log.isDebugEnabled()) + log.debug("Before acquiring transaction lock for put on key: " + enlisted); + + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted, + lockTimeout(), + this, + false, + retval, + isolation, + isInvalidate(), + -1L); + + PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) { + @Override public GridCacheReturn postLock(GridCacheReturn ret) + throws IgniteCheckedException + { + if (log.isDebugEnabled()) + log.debug("Acquired transaction lock for put on keys: " + enlisted); + + postLockWrite(cacheCtx, + enlisted, + ret, + /*remove*/false, + retval, + /*read*/false, + -1L, + filter, + /*computeInvoke*/true); + + return ret; + } + }; + + if (fut.isDone()) { + try { + return nonInterruptable(plc1.apply(fut.get(), null)); + } + catch (GridClosureException e) { + return new GridFinishedFuture<>(e.unwrap()); + } + catch (IgniteCheckedException e) { + try { + return nonInterruptable(plc1.apply(false, e)); + } + catch (Exception e1) { + return new GridFinishedFuture<>(e1); + } + } + } + else { + return nonInterruptable(new GridEmbeddedFuture<>( + fut, + plc1 + )); + } + } + else + return optimisticPutFuture(loadFut, ret); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture(e); + } + catch (RuntimeException e) { + onException(); + + throw e; + } + } + + /** * Internal method for all put and transform operations. Only one of {@code map}, {@code transformMap} * maps must be non-null. * @@ -2721,17 +2997,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert filter == null || invokeMap == null; try { - checkUpdatesAllowed(cacheCtx); + beforePut(cacheCtx, retval); } catch (IgniteCheckedException e) { return new GridFinishedFuture(e); } - cacheCtx.checkSecurity(SecurityPermission.CACHE_PUT); - - if (retval) - needReturnValue(true); - // Cached entry may be passed only from entry wrapper. final Map<?, ?> map0; final Map<?, EntryProcessor<K, V, Object>> invokeMap0; @@ -2757,15 +3028,6 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert map0 != null || invokeMap0 != null; - try { - checkValid(); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } - - init(); - final GridCacheReturn ret = new GridCacheReturn(localResult(), false); if (F.isEmpty(map0) && F.isEmpty(invokeMap0)) { @@ -2783,15 +3045,14 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter try { Set<?> keySet = map0 != null ? map0.keySet() : invokeMap0.keySet(); - Collection<KeyCacheObject> enlisted = new ArrayList<>(); + final Collection<KeyCacheObject> enlisted = new ArrayList<>(); CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); - final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite( + final IgniteInternalFuture<Void> loadFut = enlistWrite( cacheCtx, keySet, opCtx != null ? opCtx.expiry() : null, - implicit, map0, invokeMap0, invokeArgs, @@ -2806,15 +3067,12 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter false); if (pessimistic()) { - // Loose all skipped. - final Set<KeyCacheObject> loaded = loadFut.get(); - - final Collection<KeyCacheObject> keys = F.view(enlisted, F0.notIn(loaded)); + assert loadFut == null || loadFut.isDone() : loadFut; if (log.isDebugEnabled()) - log.debug("Before acquiring transaction lock for put on keys: " + keys); + log.debug("Before acquiring transaction lock for put on keys: " + enlisted); - IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(keys, + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted, lockTimeout(), this, false, @@ -2828,11 +3086,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter throws IgniteCheckedException { if (log.isDebugEnabled()) - log.debug("Acquired transaction lock for put on keys: " + keys); + log.debug("Acquired transaction lock for put on keys: " + enlisted); postLockWrite(cacheCtx, - keys, - loaded, + enlisted, ret, /*remove*/false, retval, @@ -2861,64 +3118,79 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter } } } - else + else { return nonInterruptable(new GridEmbeddedFuture<>( fut, plc1 )); + } } - else { - if (implicit()) { - // Should never load missing values for implicit transaction as values will be returned - // with prepare response, if required. - assert loadFut.isDone(); + else + return optimisticPutFuture(loadFut, ret); + } + catch (RuntimeException e) { + onException(); - try { - loadFut.get(); - } - catch (IgniteCheckedException e) { - return new GridFinishedFuture<>(e); - } + throw e; + } + } - return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { - @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) throws IgniteCheckedException { - try { - txFut.get(); + /** + * @param loadFut Missing keys load future. + * @param ret Future result. + * @return Future. + */ + private IgniteInternalFuture optimisticPutFuture(IgniteInternalFuture<Void> loadFut, final GridCacheReturn ret) { + if (implicit()) { + // Should never load missing values for implicit transaction as values will be returned + // with prepare response, if required. + assert loadFut.isDone(); - return implicitRes; - } - catch (IgniteCheckedException | RuntimeException e) { - rollbackAsync(); + try { + loadFut.get(); + } + catch (IgniteCheckedException e) { + return new GridFinishedFuture<>(e); + } - throw e; - } + return nonInterruptable(commitAsync().chain( + new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) + throws IgniteCheckedException { + try { + txFut.get(); + + return implicitRes; } - })); - } - else - return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() { - @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f) throws IgniteCheckedException { - f.get(); + catch (IgniteCheckedException | RuntimeException e) { + rollbackAsync(); - return ret; + throw e; } - })); - } + } + } + )); } - catch (RuntimeException e) { - for (IgniteTxEntry txEntry : txMap.values()) { - GridCacheEntryEx cached0 = txEntry.cached(); - - if (cached0 != null) - txEntry.context().evicts().touch(cached0, topologyVersion()); - } + else { + return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f) throws IgniteCheckedException { + f.get(); - throw e; + return ret; + } + })); } - catch (IgniteCheckedException e) { - setRollbackOnly(); + } - return new GridFinishedFuture<>(e); + /** + * + */ + private void onException() { + for (IgniteTxEntry txEntry : allEntries()) { + GridCacheEntryEx cached0 = txEntry.cached(); + + if (cached0 != null) + txEntry.context().evicts().touch(cached0, topologyVersion()); } } @@ -2974,9 +3246,10 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter assert keys0 != null; - if (log.isDebugEnabled()) + if (log.isDebugEnabled()) { log.debug("Called removeAllAsync(...) [tx=" + this + ", keys=" + keys0 + ", implicit=" + implicit + ", retval=" + retval + "]"); + } try { checkValid(); @@ -3002,140 +3275,131 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter init(); - try { - Collection<KeyCacheObject> enlisted = new ArrayList<>(); - - CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); + final Collection<KeyCacheObject> enlisted = new ArrayList<>(); - ExpiryPolicy plc; + CacheOperationContext opCtx = cacheCtx.operationContextPerCall(); - if (!F.isEmpty(filter)) - plc = opCtx != null ? opCtx.expiry() : null; - else - plc = null; + ExpiryPolicy plc; - final IgniteInternalFuture<Set<KeyCacheObject>> loadFut = enlistWrite( - cacheCtx, - keys0, - plc, - implicit, - /** lookup map */null, - /** invoke map */null, - /** invoke arguments */null, - retval, - /** lock only */false, - filter, - ret, - enlisted, - null, - drMap, - opCtx != null && opCtx.skipStore(), - singleRmv - ); + if (!F.isEmpty(filter)) + plc = opCtx != null ? opCtx.expiry() : null; + else + plc = null; - if (log.isDebugEnabled()) - log.debug("Remove keys: " + enlisted); + final IgniteInternalFuture<Void> loadFut = enlistWrite( + cacheCtx, + keys0, + plc, + /** lookup map */null, + /** invoke map */null, + /** invoke arguments */null, + retval, + /** lock only */false, + filter, + ret, + enlisted, + null, + drMap, + opCtx != null && opCtx.skipStore(), + singleRmv + ); - // Acquire locks only after having added operation to the write set. - // Otherwise, during rollback we will not know whether locks need - // to be rolled back. - if (pessimistic()) { - // Loose all skipped. - final Collection<KeyCacheObject> passedKeys = F.view(enlisted, F0.notIn(loadFut.get())); + if (log.isDebugEnabled()) + log.debug("Remove keys: " + enlisted); - if (log.isDebugEnabled()) - log.debug("Before acquiring transaction lock for remove on keys: " + passedKeys); + // Acquire locks only after having added operation to the write set. + // Otherwise, during rollback we will not know whether locks need + // to be rolled back. + if (pessimistic()) { + assert loadFut.isDone() : loadFut; - IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(passedKeys, - lockTimeout(), - this, - false, - retval, - isolation, - isInvalidate(), - -1L); + if (log.isDebugEnabled()) + log.debug("Before acquiring transaction lock for remove on keys: " + enlisted); - PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) { - @Override protected GridCacheReturn postLock(GridCacheReturn ret) - throws IgniteCheckedException - { - if (log.isDebugEnabled()) - log.debug("Acquired transaction lock for remove on keys: " + passedKeys); + IgniteInternalFuture<Boolean> fut = cacheCtx.cache().txLockAsync(enlisted, + lockTimeout(), + this, + false, + retval, + isolation, + isInvalidate(), + -1L); + + PLC1<GridCacheReturn> plc1 = new PLC1<GridCacheReturn>(ret) { + @Override protected GridCacheReturn postLock(GridCacheReturn ret) + throws IgniteCheckedException + { + if (log.isDebugEnabled()) + log.debug("Acquired transaction lock for remove on keys: " + enlisted); - postLockWrite(cacheCtx, - passedKeys, - loadFut.get(), - ret, + postLockWrite(cacheCtx, + enlisted, + ret, /*remove*/true, - retval, + retval, /*read*/false, - -1L, - filter, + -1L, + filter, /*computeInvoke*/false); - return ret; - } - }; + return ret; + } + }; - if (fut.isDone()) { + if (fut.isDone()) { + try { + return nonInterruptable(plc1.apply(fut.get(), null)); + } + catch (GridClosureException e) { + return new GridFinishedFuture<>(e.unwrap()); + } + catch (IgniteCheckedException e) { try { - return nonInterruptable(plc1.apply(fut.get(), null)); + return nonInterruptable(plc1.apply(false, e)); } - catch (GridClosureException e) { - return new GridFinishedFuture<>(e.unwrap()); - } - catch (IgniteCheckedException e) { - try { - return nonInterruptable(plc1.apply(false, e)); - } - catch (Exception e1) { - return new GridFinishedFuture<>(e1); - } + catch (Exception e1) { + return new GridFinishedFuture<>(e1); } } - else - return nonInterruptable(new GridEmbeddedFuture<>( - fut, - plc1 - )); } - else { - if (implicit()) { - // Should never load missing values for implicit transaction as values will be returned - // with prepare response, if required. - assert loadFut.isDone(); - - return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { - @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) - throws IgniteCheckedException { - try { - txFut.get(); + else + return nonInterruptable(new GridEmbeddedFuture<>( + fut, + plc1 + )); + } + else { + if (implicit()) { + // Should never load missing values for implicit transaction as values will be returned + // with prepare response, if required. + assert loadFut.isDone(); - return implicitRes; - } - catch (IgniteCheckedException | RuntimeException e) { - rollbackAsync(); + return nonInterruptable(commitAsync().chain(new CX1<IgniteInternalFuture<IgniteInternalTx>, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture<IgniteInternalTx> txFut) + throws IgniteCheckedException { + try { + txFut.get(); - throw e; - } + return implicitRes; } - })); - } - else - return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Set<KeyCacheObject>>, GridCacheReturn>() { - @Override public GridCacheReturn applyx(IgniteInternalFuture<Set<KeyCacheObject>> f) - throws IgniteCheckedException { - f.get(); + catch (IgniteCheckedException | RuntimeException e) { + rollbackAsync(); - return ret; + throw e; } - })); + } + })); } - } - catch (IgniteCheckedException e) { - setRollbackOnly(); + else { + return nonInterruptable(loadFut.chain(new CX1<IgniteInternalFuture<Void>, GridCacheReturn>() { + @Override public GridCacheReturn applyx(IgniteInternalFuture<Void> f) + throws IgniteCheckedException { + f.get(); - return new GridFinishedFuture<>(e); + return ret; + } + })); + } } } @@ -3169,16 +3433,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @return {@code True} if transaction was successfully started. */ public boolean init() { - if (txMap == null) { - txMap = new LinkedHashMap<>(txSize > 0 ? txSize : 16, 1.0f); + return !txState.init(txSize) || cctx.tm().onStarted(this); - readView = new IgniteTxMap(txMap, CU.reads()); - writeView = new IgniteTxMap(txMap, CU.writes()); - - return cctx.tm().onStarted(this); - } - - return true; } /** @@ -3188,38 +3444,8 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter * @throws IgniteCheckedException If caches already enlisted in this transaction are not compatible with given * cache (e.g. they have different stores). */ - protected void addActiveCache(GridCacheContext cacheCtx) throws IgniteCheckedException { - int cacheId = cacheCtx.cacheId(); - - // Check if we can enlist new cache to transaction. - if (!activeCacheIds.contains(cacheId)) { - String err = cctx.verifyTxCompatibility(this, activeCacheIds, cacheCtx); - - if (err != null) { - StringBuilder cacheNames = new StringBuilder(); - - int idx = 0; - - for (Integer activeCacheId : activeCacheIds) { - cacheNames.append(cctx.cacheContext(activeCacheId).name()); - - if (idx++ < activeCacheIds.size() - 1) - cacheNames.append(", "); - } - - throw new IgniteCheckedException("Failed to enlist new cache to existing transaction (" + - err + - ") [activeCaches=[" + cacheNames + "]" + - ", cacheName=" + cacheCtx.name() + - ", cacheSystem=" + cacheCtx.systemTx() + - ", txSystem=" + system() + ']'); - } - else - activeCacheIds.add(cacheId); - - if (activeCacheIds.size() == 1) - depEnabled = cacheCtx.deploymentEnabled(); - } + protected final void addActiveCache(GridCacheContext cacheCtx) throws IgniteCheckedException { + txState.addActiveCache(cacheCtx, this); } /** @@ -3294,7 +3520,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter "Invalid tx state for adding entry [op=" + op + ", val=" + val + ", entry=" + entry + ", filter=" + Arrays.toString(filter) + ", txCtx=" + cctx.tm().txContextVersion() + ", tx=" + this + ']'; - IgniteTxEntry old = txMap.get(key); + IgniteTxEntry old = entry(key); // Keep old filter if already have one (empty filter is always overridden). if (!filtersSet || !F.isEmptyOrNulls(filter)) { @@ -3358,7 +3584,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter if (!hasDrTtl) txEntry.expiry(expiryPlc); - txMap.put(key, txEntry); + txState.addEntry(txEntry); if (log.isDebugEnabled()) log.debug("Created transaction entry: " + txEntry); @@ -3420,7 +3646,7 @@ public abstract class IgniteTxLocalAdapter extends IgniteTxAdapter /** {@inheritDoc} */ @Override public String toString() { return GridToStringBuilder.toString(IgniteTxLocalAdapter.class, this, "super", super.toString(), - "size", (txMap == null ? 0 : txMap.size())); + "size", allEntries().size()); } /**
http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java index 0d83338..5dc3338 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalEx.java @@ -93,9 +93,37 @@ public interface IgniteTxLocalEx extends IgniteInternalTx { /** * @param cacheCtx Cache context. + * @param key Key. + * @param val Value. + * @param retval Return value flag. + * @param filter Filter. + * @return Future for put operation. + */ + public <K, V> IgniteInternalFuture<GridCacheReturn> putAsync( + GridCacheContext cacheCtx, + K key, + V val, + boolean retval, + CacheEntryPredicate[] filter); + + /** + * @param cacheCtx Cache context. + * @param key Key. + * @param entryProcessor Entry processor. + * @param invokeArgs Optional arguments for entry processor. + * @return Operation future. + */ + public <K, V> IgniteInternalFuture<GridCacheReturn> invokeAsync( + GridCacheContext cacheCtx, + K key, + EntryProcessor<K, V, Object> entryProcessor, + Object... invokeArgs); + + /** + * @param cacheCtx Cache context. * @param map Entry processors map. * @param invokeArgs Optional arguments for entry processor. - * @return Transform operation future. + * @return Operation future. */ public <K, V, T> IgniteInternalFuture<GridCacheReturn> invokeAsync( GridCacheContext cacheCtx, http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java new file mode 100644 index 0000000..123d396 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalState.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +/** + * + */ +public interface IgniteTxLocalState extends IgniteTxState { + /** + * @param entry Entry. + */ + public void addEntry(IgniteTxEntry entry); + + /** + * @param txSize Transaction size. + * @return {@code True} if transaction was successfully started. + */ + public boolean init(int txSize); + + /** + * @return {@code True} if init method was called. + */ + public boolean initialized(); + + /** + * + */ + public void seal(); +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java new file mode 100644 index 0000000..cde5203 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxLocalStateAdapter.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * + */ +public abstract class IgniteTxLocalStateAdapter implements IgniteTxLocalState { + /** + * @param cacheCtx Cache context. + * @param tx Transaction. + * @param commit {@code False} if transaction rolled back. + */ + protected final void onTxEnd(GridCacheContext cacheCtx, IgniteInternalTx tx, boolean commit) { + if (cacheCtx.cache().configuration().isStatisticsEnabled()) { + // Convert start time from ms to ns. + if (commit) + cacheCtx.cache().metrics0().onTxCommit((U.currentTimeMillis() - tx.startTime()) * 1000); + else + cacheCtx.cache().metrics0().onTxRollback((U.currentTimeMillis() - tx.startTime()) * 1000); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java index c2e7dea..ccccca0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxManager.java @@ -1092,13 +1092,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (!tx.system()) cctx.txMetrics().onTxCommit(); - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - - if (cacheCtx.cache().configuration().isStatisticsEnabled()) - // Convert start time from ms to ns. - cacheCtx.cache().metrics0().onTxCommit((U.currentTimeMillis() - tx.startTime()) * 1000); - } + tx.txState().onTxEnd(cctx, tx, true); } if (slowTxWarnTimeout > 0 && tx.local() && @@ -1163,13 +1157,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (!tx.system()) cctx.txMetrics().onTxRollback(); - for (int cacheId : tx.activeCacheIds()) { - GridCacheContext cacheCtx = cctx.cacheContext(cacheId); - - if (cacheCtx.cache().configuration().isStatisticsEnabled()) - // Convert start time from ms to ns. - cacheCtx.cache().metrics0().onTxRollback((U.currentTimeMillis() - tx.startTime()) * 1000); - } + tx.txState().onTxEnd(cctx, tx, false); } if (log.isDebugEnabled()) @@ -1233,7 +1221,7 @@ public class IgniteTxManager extends GridCacheSharedManagerAdapter { if (!tx.system()) threadMap.remove(tx.threadId(), tx); else { - Integer cacheId = F.first(tx.activeCacheIds()); + Integer cacheId = tx.txState().firstCacheId(); if (cacheId != null) sysThreadMap.remove(new TxThreadKey(tx.threadId(), cacheId), tx); http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java index 6408573..429c995 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxMap.java @@ -170,8 +170,7 @@ public class IgniteTxMap extends AbstractMap<IgniteTxKey, IgniteTxEntry> impleme } /** {@inheritDoc} */ - @Nullable - @Override public IgniteTxEntry get(Object key) { + @Nullable @Override public IgniteTxEntry get(Object key) { IgniteTxEntry e = txMap.get(key); return e == null ? null : filter.apply(e) ? e : null; http://git-wip-us.apache.org/repos/asf/ignite/blob/3ff71fd7/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java index 9660e4e..b80909f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/IgniteTxRemoteEx.java @@ -25,22 +25,13 @@ import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; */ public interface IgniteTxRemoteEx extends IgniteInternalTx { /** - * @return Remote thread ID. - */ - public long remoteThreadId(); - - /** * @param baseVer Base version. * @param committedVers Committed version. * @param rolledbackVers Rolled back version. * @param pendingVers Pending versions. */ - public void doneRemote(GridCacheVersion baseVer, Collection<GridCacheVersion> committedVers, - Collection<GridCacheVersion> rolledbackVers, Collection<GridCacheVersion> pendingVers); - - /** - * @param e Sets write value for pessimistic transactions. - * @return {@code True} if entry was found. - */ - public boolean setWriteValue(IgniteTxEntry e); + public void doneRemote(GridCacheVersion baseVer, + Collection<GridCacheVersion> committedVers, + Collection<GridCacheVersion> rolledbackVers, + Collection<GridCacheVersion> pendingVers); } \ No newline at end of file