http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java index 4d5fa13..a83a93f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java @@ -42,8 +42,6 @@ import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; import org.apache.ignite.internal.processors.cache.GridCacheReturn; import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; import org.apache.ignite.internal.processors.cache.KeyCacheObject; -import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; -import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest; @@ -61,6 +59,10 @@ import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSing import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest; +import org.apache.ignite.internal.processors.cache.mvcc.MvccQueryTracker; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; +import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; @@ -68,6 +70,7 @@ import org.apache.ignite.internal.util.future.GridEmbeddedFuture; import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.lang.IgnitePair; import org.apache.ignite.internal.util.typedef.C2; +import org.apache.ignite.internal.util.typedef.CI1; import org.apache.ignite.internal.util.typedef.CI2; import org.apache.ignite.internal.util.typedef.CX1; import org.apache.ignite.internal.util.typedef.F; @@ -186,13 +189,14 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (keyCheck) validateCacheKey(key); - GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = ctx.mvccEnabled() ? MvccUtils.tx(ctx.kernalContext()) : ctx.tm().threadLocalTx(ctx); final CacheOperationContext opCtx = ctx.operationContextPerCall(); final boolean recovery = opCtx != null && opCtx.recovery(); - if (tx != null && !tx.implicit() && !skipTx) { + // Get operation bypass Tx in Mvcc mode. + if (!ctx.mvccEnabled() && tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<V>() { @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx, @@ -230,6 +234,26 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte subjId = ctx.subjectIdPerCall(subjId, opCtx); + MvccSnapshot mvccSnapshot = null; + MvccQueryTracker mvccTracker = null; + + if (ctx.mvccEnabled()) { + try { + if (tx != null) + mvccSnapshot = MvccUtils.requestSnapshot(ctx, tx); + else { + mvccTracker = MvccUtils.mvccTracker(ctx, null); + + mvccSnapshot = mvccTracker.snapshot(); + } + + assert mvccSnapshot != null; + } + catch (IgniteCheckedException ex) { + return new GridFinishedFuture<>(ex); + } + } + GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx, ctx.toCacheKeyObject(key), topVer, @@ -243,10 +267,21 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte needVer, /*keepCacheObjects*/false, opCtx != null && opCtx.recovery(), - null); + mvccSnapshot); fut.init(); + if(mvccTracker != null){ + final MvccQueryTracker mvccTracker0 = mvccTracker; + + fut.listen(new CI1<IgniteInternalFuture<Object>>() { + @Override public void apply(IgniteInternalFuture<Object> future) { + if(future.isDone()) + mvccTracker0.onDone(); + } + }); + } + return (IgniteInternalFuture<V>)fut; } @@ -270,13 +305,15 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte if (keyCheck) validateCacheKeys(keys); - GridNearTxLocal tx = ctx.tm().threadLocalTx(ctx); + GridNearTxLocal tx = (ctx.mvccEnabled()) ? MvccUtils.tx(ctx.kernalContext()) : ctx.tm().threadLocalTx(ctx); final CacheOperationContext opCtx = ctx.operationContextPerCall(); - if (tx != null && !tx.implicit() && !skipTx) { + if (!ctx.mvccEnabled() && tx != null && !tx.implicit() && !skipTx) { return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { - @Override public IgniteInternalFuture<Map<K, V>> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<Map<K, V>> op(GridNearTxLocal tx, + AffinityTopologyVersion readyTopVer) { return tx.getAllAsync(ctx, readyTopVer, ctx.cacheKeysView(keys), @@ -290,14 +327,34 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte }, opCtx, /*retry*/false); } - AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); - subjId = ctx.subjectIdPerCall(subjId, opCtx); - return loadAsync( + MvccSnapshot mvccSnapshot = null; + MvccQueryTracker mvccTracker = null; + + if (ctx.mvccEnabled()) { + try { + if (tx != null) + mvccSnapshot = MvccUtils.requestSnapshot(ctx, tx); + else { + mvccTracker = MvccUtils.mvccTracker(ctx, null); + + mvccSnapshot = mvccTracker.snapshot(); + } + + assert mvccSnapshot != null; + } + catch (IgniteCheckedException ex) { + return new GridFinishedFuture(ex); + } + } + + AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion(); + + IgniteInternalFuture<Map<K, V>> fut = loadAsync( ctx.cacheKeysView(keys), opCtx == null || !opCtx.skipStore(), - forcePrimary, + forcePrimary , topVer, subjId, taskName, @@ -305,46 +362,23 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte recovery, skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), skipVals, - needVer); - } - - /** - * @param keys Keys to load. - * @param readThrough Read through flag. - * @param forcePrimary Force get from primary node flag. - * @param topVer Topology version. - * @param subjId Subject ID. - * @param taskName Task name. - * @param deserializeBinary Deserialize binary flag. - * @param expiryPlc Expiry policy. - * @param skipVals Skip values flag. - * @param needVer Need version. - * @return Loaded values. - */ - private IgniteInternalFuture<Map<K, V>> loadAsync( - @Nullable Collection<KeyCacheObject> keys, - boolean readThrough, - boolean forcePrimary, - AffinityTopologyVersion topVer, - @Nullable UUID subjId, - String taskName, - boolean deserializeBinary, - boolean recovery, - @Nullable IgniteCacheExpiryPolicy expiryPlc, - boolean skipVals, - boolean needVer) { - return loadAsync(keys, - readThrough, - forcePrimary, - topVer, subjId, - taskName, - deserializeBinary, - recovery, - expiryPlc, - skipVals, needVer, false, - null); + mvccSnapshot); + + if(mvccTracker != null){ + final MvccQueryTracker mvccTracker0 = mvccTracker; + + fut.listen(new CI1<IgniteInternalFuture<Map<K, V>>>() { + /** {@inheritDoc} */ + @Override public void apply(IgniteInternalFuture<Map<K, V>> future) { + if(future.isDone()) + mvccTracker0.onDone(); + } + }); + } + + return fut; } /** @@ -445,7 +479,9 @@ public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapte for (KeyCacheObject key : keys) { if (readNoEntry) { - CacheDataRow row = ctx.offheap().read(ctx, key); + CacheDataRow row = mvccSnapshot != null ? + ctx.offheap().mvccRead(ctx, key, mvccSnapshot) : + ctx.offheap().read(ctx, key); if (row != null) { long expireTime = row.expireTime();
http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java index b167f26..85a48a3 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearPessimisticTxPrepareFuture.java @@ -287,10 +287,15 @@ public class GridNearPessimisticTxPrepareFuture extends GridNearTxPrepareFutureA boolean queryMapped = false; - for (GridDistributedTxMapping m : F.view(tx.mappings().mappings(), CU.FILTER_QUERY_MAPPING)) { + assert !tx.implicitSingle() || tx.queryEnlisted(); // Non-mvcc implicit-single tx goes fast commit way. + + Collection<GridDistributedTxMapping> txMappings = !tx.implicitSingle() ? tx.mappings().mappings() + : Collections.singleton(tx.mappings().singleMapping()); + + for (GridDistributedTxMapping m : F.view(txMappings, CU.FILTER_QUERY_MAPPING)) { GridDistributedTxMapping nodeMapping = mappings.get(m.primary().id()); - if(nodeMapping == null) + if (nodeMapping == null) mappings.put(m.primary().id(), m); txMapping.addMapping(F.asList(m.primary())); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java index f484bd6..11f98ca 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxAbstractEnlistFuture.java @@ -42,10 +42,10 @@ import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; import org.apache.ignite.internal.util.tostring.GridToStringExclude; import org.apache.ignite.internal.util.typedef.CI1; -import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgniteReducer; import org.apache.ignite.lang.IgniteUuid; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; @@ -53,11 +53,8 @@ import org.jetbrains.annotations.Nullable; /** * */ -public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundIdentityFuture<Long> implements - GridCacheVersionedFuture<Long> { - /** */ - private static final long serialVersionUID = -6069985059301497282L; - +public abstract class GridNearTxAbstractEnlistFuture<T> extends GridCacheCompoundIdentityFuture<T> implements + GridCacheVersionedFuture<T> { /** Done field updater. */ private static final AtomicIntegerFieldUpdater<GridNearTxAbstractEnlistFuture> DONE_UPD = AtomicIntegerFieldUpdater.newUpdater(GridNearTxAbstractEnlistFuture.class, "done"); @@ -117,10 +114,11 @@ public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundId * @param cctx Cache context. * @param tx Transaction. * @param timeout Timeout. + * @param rdc Compound future reducer. */ public GridNearTxAbstractEnlistFuture( - GridCacheContext<?, ?> cctx, GridNearTxLocal tx, long timeout) { - super(CU.longReducer()); + GridCacheContext<?, ?> cctx, GridNearTxLocal tx, long timeout, @Nullable IgniteReducer<T, T> rdc) { + super(rdc); assert cctx != null; assert tx != null; @@ -300,8 +298,6 @@ public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundId throw new IgniteCheckedException("Future is done."); } - - /** */ private void mapOnTopology() { @@ -359,7 +355,7 @@ public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundId } /** {@inheritDoc} */ - @Override protected boolean processFailure(Throwable err, IgniteInternalFuture<Long> fut) { + @Override protected boolean processFailure(Throwable err, IgniteInternalFuture<T> fut) { if (ex != null || !EX_UPD.compareAndSet(this, null, err)) ex.addSuppressed(err); @@ -367,7 +363,7 @@ public abstract class GridNearTxAbstractEnlistFuture extends GridCacheCompoundId } /** {@inheritDoc} */ - @Override public boolean onDone(@Nullable Long res, @Nullable Throwable err, boolean cancelled) { + @Override public boolean onDone(@Nullable T res, @Nullable Throwable err, boolean cancelled) { if (!DONE_UPD.compareAndSet(this, 0, 1)) return false; http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java new file mode 100644 index 0000000..8d85bd9 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistFuture.java @@ -0,0 +1,683 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxAbstractEnlistFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxEnlistFuture; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshotWithoutTxs; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.EnlistOperation; +import org.apache.ignite.internal.processors.query.UpdateSourceIterator; +import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.CI1; +import org.apache.ignite.internal.util.typedef.F; +import org.apache.ignite.internal.util.typedef.X; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.internal.processors.cache.distributed.dht.NearTxResultHandler.createResponse; +import static org.apache.ignite.internal.processors.cache.mvcc.MvccUtils.MVCC_OP_COUNTER_NA; +import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; +import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; + +/** + * A future tracking requests for remote nodes transaction enlisting and locking produces by cache API operations. + */ +public class GridNearTxEnlistFuture extends GridNearTxAbstractEnlistFuture<GridCacheReturn> { + /** Default batch size. */ + public static final int DFLT_BATCH_SIZE = 1024; + + /** SkipCntr field updater. */ + private static final AtomicIntegerFieldUpdater<GridNearTxEnlistFuture> SKIP_UPD = + AtomicIntegerFieldUpdater.newUpdater(GridNearTxEnlistFuture.class, "skipCntr"); + + /** Marker object. */ + private static final Object FINISHED = new Object(); + + /** Source iterator. */ + @GridToStringExclude + private final UpdateSourceIterator<?> it; + + /** Batch size. */ + private int batchSize; + + /** */ + private AtomicInteger batchCntr = new AtomicInteger(); + + /** */ + @SuppressWarnings("unused") + @GridToStringExclude + private volatile int skipCntr; + + /** Future result. */ + @GridToStringExclude + private volatile GridCacheReturn res; + + /** */ + private final Map<UUID, Batch> batches = new ConcurrentHashMap<>(); + + /** Row extracted from iterator but not yet used. */ + private Object peek; + + /** Topology locked flag. */ + private boolean topLocked; + + /** Ordered batch sending flag. */ + private final boolean sequential; + + /** Filter. */ + private final CacheEntryPredicate filter; + + /** Need previous value flag. */ + private final boolean needRes; + + /** + * @param cctx Cache context. + * @param tx Transaction. + * @param timeout Timeout. + * @param it Rows iterator. + * @param batchSize Batch size. + * @param sequential Sequential locking flag. + * @param filter Filter. + * @param needRes Need previous value flag. + */ + public GridNearTxEnlistFuture(GridCacheContext<?, ?> cctx, + GridNearTxLocal tx, + long timeout, + UpdateSourceIterator<?> it, + int batchSize, + boolean sequential, + @Nullable CacheEntryPredicate filter, + boolean needRes) { + super(cctx, tx, timeout, null); + + this.it = it; + this.batchSize = batchSize > 0 ? batchSize : DFLT_BATCH_SIZE; + this.sequential = sequential; + this.filter = filter; + this.needRes = needRes; + } + + /** {@inheritDoc} */ + @Override protected void map(boolean topLocked) { + this.topLocked = topLocked; + + sendNextBatches(null); + } + + /** + * Continue iterating the data rows and form new batches. + * + * @param nodeId Node that is ready for a new batch. + */ + private void sendNextBatches(@Nullable UUID nodeId) { + try { + Collection<Batch> next = continueLoop(nodeId); + + if (next == null) + return; + + boolean first = (nodeId != null); + + for (Batch batch : next) { + ClusterNode node = batch.node(); + + sendBatch(node, batch, first); + + if (!node.isLocal()) + first = false; + } + } + catch (Throwable e) { + onDone(e); + + if (e instanceof Error) + throw (Error)e; + } + } + + /** + * Iterate data rows and form batches. + * + * @param nodeId Id of node acknowledged the last batch. + * @return Collection of newly completed batches. + * @throws IgniteCheckedException If failed. + */ + private Collection<Batch> continueLoop(@Nullable UUID nodeId) throws IgniteCheckedException { + if (nodeId != null) + batches.remove(nodeId); + + // Accumulate number of batches released since we got here. + // Let only one thread do the looping. + if (isDone() || SKIP_UPD.getAndIncrement(this) != 0) + return null; + + ArrayList<Batch> res = null; + Batch batch = null; + + boolean flush = false; + + EnlistOperation op = it.operation(); + + while (true) { + while (hasNext0()) { + checkCompleted(); + + Object cur = next0(); + + KeyCacheObject key = cctx.toCacheKeyObject(op.isDeleteOrLock() ? cur : ((IgniteBiTuple)cur).getKey()); + + List<ClusterNode> nodes = cctx.affinity().nodesByKey(key, topVer); + + ClusterNode node; + + if (F.isEmpty(nodes) || ((node = nodes.get(0)) == null)) + throw new ClusterTopologyCheckedException("Failed to get primary node " + + "[topVer=" + topVer + ", key=" + key + ']'); + + tx.markQueryEnlisted(null); + + if (!sequential) + batch = batches.get(node.id()); + else if (batch != null && !batch.node().equals(node)) + res = markReady(res, batch); + + if (batch == null) + batches.put(node.id(), batch = new Batch(node)); + + if (batch.ready()) { + // Can't advance further at the moment. + batch = null; + + peek = cur; + + it.beforeDetach(); + + flush = true; + + break; + } + + batch.add(op.isDeleteOrLock() ? key : cur, + op != EnlistOperation.LOCK && cctx.affinityNode() && (cctx.isReplicated() || nodes.indexOf(cctx.localNode()) > 0)); + + if (batch.size() == batchSize) + res = markReady(res, batch); + } + + if (SKIP_UPD.decrementAndGet(this) == 0) + break; + + skipCntr = 1; + } + + if (flush) + return res; + + // No data left - flush incomplete batches. + for (Batch batch0 : batches.values()) { + if (!batch0.ready()) { + if (res == null) + res = new ArrayList<>(); + + batch0.ready(true); + + res.add(batch0); + } + } + + if (batches.isEmpty()) + onDone(this.res); + + return res; + } + + /** */ + private Object next0() { + if (!hasNext0()) + throw new NoSuchElementException(); + + Object cur; + + if ((cur = peek) != null) + peek = null; + else + cur = it.next(); + + return cur; + } + + /** */ + private boolean hasNext0() { + if (peek == null && !it.hasNext()) + peek = FINISHED; + + return peek != FINISHED; + } + + /** + * Add batch to batch collection if it is ready. + * + * @param batches Collection of batches. + * @param batch Batch to be added. + */ + private ArrayList<Batch> markReady(ArrayList<Batch> batches, Batch batch) { + if (!batch.ready()) { + batch.ready(true); + + if (batches == null) + batches = new ArrayList<>(); + + batches.add(batch); + } + + return batches; + } + + /** + * @param primaryId Primary node id. + * @param rows Rows. + * @param dhtVer Dht version assigned at primary node. + * @param dhtFutId Dht future id assigned at primary node. + */ + private void processBatchLocalBackupKeys(UUID primaryId, List<Object> rows, GridCacheVersion dhtVer, + IgniteUuid dhtFutId) { + assert dhtVer != null; + assert dhtFutId != null; + + EnlistOperation op = it.operation(); + + assert op != EnlistOperation.LOCK; + + boolean keysOnly = op.isDeleteOrLock(); + + final ArrayList<KeyCacheObject> keys = new ArrayList<>(rows.size()); + final ArrayList<Message> vals = keysOnly ? null : new ArrayList<>(rows.size()); + + for (Object row : rows) { + if (keysOnly) + keys.add(cctx.toCacheKeyObject(row)); + else { + keys.add(cctx.toCacheKeyObject(((IgniteBiTuple)row).getKey())); + vals.add(cctx.toCacheObject(((IgniteBiTuple)row).getValue())); + } + } + + try { + GridDhtTxRemote dhtTx = cctx.tm().tx(dhtVer); + + if (dhtTx == null) { + dhtTx = new GridDhtTxRemote(cctx.shared(), + cctx.localNodeId(), + dhtFutId, + primaryId, + lockVer, + topVer, + dhtVer, + null, + cctx.systemTx(), + cctx.ioPolicy(), + PESSIMISTIC, + REPEATABLE_READ, + false, + tx.remainingTime(), + -1, + this.tx.subjectId(), + this.tx.taskNameHash(), + false); + + dhtTx.mvccSnapshot(new MvccSnapshotWithoutTxs(mvccSnapshot.coordinatorVersion(), + mvccSnapshot.counter(), MVCC_OP_COUNTER_NA, mvccSnapshot.cleanupVersion())); + + dhtTx = cctx.tm().onCreated(null, dhtTx); + + if (dhtTx == null || !cctx.tm().onStarted(dhtTx)) { + throw new IgniteTxRollbackCheckedException("Failed to update backup " + + "(transaction has been completed): " + dhtVer); + } + } + + dhtTx.mvccEnlistBatch(cctx, it.operation(), keys, vals, mvccSnapshot.withoutActiveTransactions()); + } + catch (IgniteCheckedException e) { + onDone(e); + + return; + } + + sendNextBatches(primaryId); + } + + /** + * + * @param node Node. + * @param batch Batch. + * @param first First mapping flag. + */ + private void sendBatch(ClusterNode node, Batch batch, boolean first) throws IgniteCheckedException { + updateMappings(node); + + boolean clientFirst = first && cctx.localNode().isClient() && !topLocked && !tx.hasRemoteLocks(); + + int batchId = batchCntr.incrementAndGet(); + + if (node.isLocal()) + enlistLocal(batchId, node.id(), batch); + else + sendBatch(batchId, node.id(), batch, clientFirst); + } + + /** + * Send batch request to remote data node. + * + * @param batchId Id of a batch mini-future. + * @param nodeId Node id. + * @param batchFut Mini-future for the batch. + * @param clientFirst {@code true} if originating node is client and it is a first request to any data node. + */ + private void sendBatch(int batchId, UUID nodeId, Batch batchFut, boolean clientFirst) throws IgniteCheckedException { + assert batchFut != null; + + GridNearTxEnlistRequest req = new GridNearTxEnlistRequest(cctx.cacheId(), + threadId, + futId, + batchId, + tx.subjectId(), + topVer, + lockVer, + mvccSnapshot, + clientFirst, + remainingTime(), + tx.remainingTime(), + tx.taskNameHash(), + batchFut.rows(), + it.operation(), + needRes, + filter + ); + + sendRequest(req, nodeId); + } + + /** + * @param req Request. + * @param nodeId Remote node ID + * @throws IgniteCheckedException if failed to send. + */ + private void sendRequest(GridCacheMessage req, UUID nodeId) throws IgniteCheckedException { + IgniteInternalFuture<?> txSync = cctx.tm().awaitFinishAckAsync(nodeId, tx.threadId()); + + if (txSync == null || txSync.isDone()) + cctx.io().send(nodeId, req, cctx.ioPolicy()); + else + txSync.listen(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> future) { + try { + cctx.io().send(nodeId, req, cctx.ioPolicy()); + } + catch (IgniteCheckedException e) { + GridNearTxEnlistFuture.this.onDone(e); + } + } + }); + } + + /** + * Enlist batch of entries to the transaction on local node. + * + * @param batchId Id of a batch mini-future. + * @param nodeId Node id. + * @param batch Batch. + */ + private void enlistLocal(int batchId, UUID nodeId, Batch batch) throws IgniteCheckedException { + Collection<Object> rows = batch.rows(); + + GridDhtTxEnlistFuture fut = new GridDhtTxEnlistFuture(nodeId, + lockVer, + mvccSnapshot, + threadId, + futId, + batchId, + tx, + remainingTime(), + cctx, + rows, + it.operation(), + filter, + needRes); + + updateLocalFuture(fut); + + fut.listen(new CI1<IgniteInternalFuture<GridCacheReturn>>() { + @Override public void apply(IgniteInternalFuture<GridCacheReturn> fut) { + try { + clearLocalFuture((GridDhtTxAbstractEnlistFuture)fut); + + GridNearTxEnlistResponse res = fut.error() == null ? createResponse(fut) : null; + + if (checkResponse(nodeId, res, fut.error())) + sendNextBatches(nodeId); + } + catch (IgniteCheckedException e) { + checkResponse(nodeId, null, e); + } + finally { + CU.unwindEvicts(cctx); + } + } + }); + + fut.init(); + } + + /** + * @param nodeId Sender node id. + * @param res Response. + */ + public void onResult(UUID nodeId, GridNearTxEnlistResponse res) { + if (checkResponse(nodeId, res, res.error())) { + + Batch batch = batches.get(nodeId); + + if (batch != null && !F.isEmpty(batch.localBackupRows()) && res.dhtFutureId() != null) + processBatchLocalBackupKeys(nodeId, batch.localBackupRows(), res.dhtVersion(), res.dhtFutureId()); + else + sendNextBatches(nodeId); + } + } + + /** {@inheritDoc} */ + @Override public boolean onNodeLeft(UUID nodeId) { + if (batches.keySet().contains(nodeId)) { + if (log.isDebugEnabled()) + log.debug("Found unacknowledged batch for left node [nodeId=" + nodeId + ", fut=" + + this + ']'); + + ClusterTopologyCheckedException topEx = new ClusterTopologyCheckedException("Failed to enlist keys " + + "(primary node left grid, retry transaction if possible) [node=" + nodeId + ']'); + + topEx.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(topVer)); + + processFailure(topEx, null); + + batches.remove(nodeId); + + if (batches.isEmpty()) // Wait for all pending requests. + onDone(); + + } + + if (log.isDebugEnabled()) + log.debug("Future does not have mapping for left node (ignoring) [nodeId=" + nodeId + + ", fut=" + this + ']'); + + return false; + } + + /** + * @param nodeId Originating node ID. + * @param res Response. + * @param err Exception. + * @return {@code True} if future was completed by this call. + */ + @SuppressWarnings("unchecked") + public boolean checkResponse(UUID nodeId, GridNearTxEnlistResponse res, Throwable err) { + assert res != null || err != null : this; + + if (err == null && res.error() != null) + err = res.error(); + + if (X.hasCause(err, ClusterTopologyCheckedException.class)) + tx.removeMapping(nodeId); + + if (err != null) + processFailure(err, null); + + if (ex != null) { + batches.remove(nodeId); + + if (batches.isEmpty()) // Wait for all pending requests. + onDone(); + + return false; + } + + assert res != null; + + this.res = res.result(); + + assert this.res != null && (this.res.emptyResult() || needRes || !this.res.success()); + + return true; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearTxEnlistFuture.class, this, super.toString()); + } + + /** + * A batch of rows + */ + private static class Batch { + /** Node ID. */ + @GridToStringExclude + private final ClusterNode node; + + /** Rows. */ + private List<Object> rows = new ArrayList<>(); + + /** Local backup rows. */ + private List<Object> locBkpRows; + + /** Readiness flag. Set when batch is full or no new rows are expected. */ + private boolean ready; + + /** + * @param node Cluster node. + */ + private Batch(ClusterNode node) { + this.node = node; + } + + /** + * @return Node. + */ + public ClusterNode node() { + return node; + } + + /** + * Adds a row. + * + * @param row Row. + * @param localBackup {@code true}, when the row key has local backup. + */ + public void add(Object row, boolean localBackup) { + rows.add(row); + + if (localBackup) { + if (locBkpRows == null) + locBkpRows = new ArrayList<>(); + + locBkpRows.add(row); + } + } + + /** + * @return number of rows. + */ + public int size() { + return rows.size(); + } + + /** + * @return Collection of rows. + */ + public Collection<Object> rows() { + return rows; + } + + /** + * @return Collection of local backup rows. + */ + public List<Object> localBackupRows() { + return locBkpRows; + } + + /** + * @return Readiness flag. + */ + public boolean ready() { + return ready; + } + + /** + * Sets readiness flag. + * + * @param ready Flag value. + */ + public void ready(boolean ready) { + this.ready = ready; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java new file mode 100644 index 0000000..1d87023 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistRequest.java @@ -0,0 +1,642 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collection; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; +import org.apache.ignite.internal.processors.cache.CacheObject; +import org.apache.ignite.internal.processors.cache.CacheObjectContext; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.KeyCacheObject; +import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.query.EnlistOperation; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.lang.IgniteBiTuple; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * Request to enlist into transaction and acquire locks for entries produced with Cache API operations. + * + * One request per batch of entries is used. + */ +public class GridNearTxEnlistRequest extends GridCacheIdMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private long threadId; + + /** Future id. */ + private IgniteUuid futId; + + /** */ + private boolean clientFirst; + + /** */ + private int miniId; + + /** */ + private UUID subjId; + + /** */ + private AffinityTopologyVersion topVer; + + /** */ + private GridCacheVersion lockVer; + + /** Mvcc snapshot. */ + private MvccSnapshot mvccSnapshot; + + /** */ + private long timeout; + + /** */ + private long txTimeout; + + /** */ + private int taskNameHash; + + /** Rows to enlist. */ + @GridDirectTransient + private Collection<Object> rows; + + /** Serialized rows keys. */ + @GridToStringExclude + private KeyCacheObject[] keys; + + /** Serialized rows values. */ + @GridToStringExclude + private CacheObject[] values; + + /** Enlist operation. */ + private EnlistOperation op; + + /** Filter. */ + @GridToStringExclude + private CacheEntryPredicate filter; + + /** Need previous value flag. */ + private boolean needRes; + + /** + * Default constructor. + */ + public GridNearTxEnlistRequest() { + // No-op. + } + + /** + * Constructor. + * + * @param cacheId Cache id. + * @param threadId Thread id. + * @param futId Future id. + * @param miniId Mini-future id. + * @param subjId Transaction subject id. + * @param topVer Topology version. + * @param lockVer Lock version. + * @param mvccSnapshot Mvcc snapshot. + * @param clientFirst First client request flag. + * @param timeout Timeout. + * @param txTimeout Tx timeout. + * @param taskNameHash Task name hash. + * @param rows Rows. + * @param op Operation. + * @param filter Filter. + */ + GridNearTxEnlistRequest(int cacheId, + long threadId, + IgniteUuid futId, + int miniId, + UUID subjId, + AffinityTopologyVersion topVer, + GridCacheVersion lockVer, + MvccSnapshot mvccSnapshot, + boolean clientFirst, + long timeout, + long txTimeout, + int taskNameHash, + Collection<Object> rows, + EnlistOperation op, + boolean needRes, + @Nullable CacheEntryPredicate filter) { + this.txTimeout = txTimeout; + this.filter = filter; + this.cacheId = cacheId; + this.threadId = threadId; + this.futId = futId; + this.miniId = miniId; + this.subjId = subjId; + this.topVer = topVer; + this.lockVer = lockVer; + this.mvccSnapshot = mvccSnapshot; + this.clientFirst = clientFirst; + this.timeout = timeout; + this.taskNameHash = taskNameHash; + this.rows = rows; + this.op = op; + this.needRes = needRes; + } + + /** + * @return Thread id. + */ + public long threadId() { + return threadId; + } + + /** + * @return Future id. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Mini future ID. + */ + public int miniId() { + return miniId; + } + + /** + * @return Subject id. + */ + public UUID subjectId() { + return subjId; + } + + /** + * @return Topology version. + */ + @Override public AffinityTopologyVersion topologyVersion() { + return topVer; + } + + /** + * @return Lock version. + */ + public GridCacheVersion version() { + return lockVer; + } + + /** + * @return MVCC snapshot. + */ + public MvccSnapshot mvccSnapshot() { + return mvccSnapshot; + } + + /** + * @return Timeout milliseconds. + */ + public long timeout() { + return timeout; + } + + /** + * @return Tx timeout milliseconds. + */ + public long txTimeout() { + return txTimeout; + } + + /** + * @return Task name hash. + */ + public int taskNameHash() { + return taskNameHash; + } + + /** + * @return {@code True} if this is the first client request. + */ + public boolean firstClientRequest() { + return clientFirst; + } + + /** + * @return Collection of rows. + */ + public Collection<Object> rows() { + return rows; + } + + /** + * @return Operation. + */ + public EnlistOperation operation() { + return op; + } + + /** + * @return Need result flag. + */ + public boolean needRes() { + return needRes; + } + + /** + * @return Filter. + */ + public CacheEntryPredicate filter() { + return filter; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + GridCacheContext cctx = ctx.cacheContext(cacheId); + CacheObjectContext objCtx = cctx.cacheObjectContext(); + + if (rows != null && keys == null) { + keys = new KeyCacheObject[rows.size()]; + + int i = 0; + + boolean keysOnly = op.isDeleteOrLock(); + + values = keysOnly ? null : new CacheObject[keys.length]; + + for (Object row : rows) { + Object key, val = null; + + if (keysOnly) + key = row; + else { + key = ((IgniteBiTuple)row).getKey(); + val = ((IgniteBiTuple)row).getValue(); + } + + assert key != null && (keysOnly || val != null) : "key=" + key + ", val=" + val; + + KeyCacheObject key0 = cctx.toCacheKeyObject(key); + + assert key0 != null; + + key0.prepareMarshal(objCtx); + + keys[i] = key0; + + if (!keysOnly) { + CacheObject val0 = cctx.toCacheObject(val); + + assert val0 != null; + + val0.prepareMarshal(objCtx); + + values[i] = val0; + } + + i++; + } + } + + if (filter != null) + filter.prepareMarshal(cctx); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (keys != null) { + rows = new ArrayList<>(keys.length); + + CacheObjectContext objCtx = ctx.cacheContext(cacheId).cacheObjectContext(); + + for (int i = 0; i < keys.length; i++) { + keys[i].finishUnmarshal(objCtx, ldr); + + if (op.isDeleteOrLock()) + rows.add(keys[i]); + else { + if (values[i] != null) + values[i].finishUnmarshal(objCtx, ldr); + + rows.add(new IgniteBiTuple<>(keys[i], values[i])); + } + } + + keys = null; + values = null; + } + + if (filter != null) + filter.finishUnmarshal(ctx.cacheContext(cacheId), ldr); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeBoolean("clientFirst", clientFirst)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMessage("filter", filter)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeIgniteUuid("futId", futId)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeObjectArray("keys", keys, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeMessage("lockVer", lockVer)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeInt("miniId", miniId)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeMessage("mvccSnapshot", mvccSnapshot)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeBoolean("needRes", needRes)) + return false; + + writer.incrementState(); + + case 11: + if (!writer.writeByte("op", op != null ? (byte)op.ordinal() : -1)) + return false; + + writer.incrementState(); + + case 12: + if (!writer.writeUuid("subjId", subjId)) + return false; + + writer.incrementState(); + + case 13: + if (!writer.writeInt("taskNameHash", taskNameHash)) + return false; + + writer.incrementState(); + + case 14: + if (!writer.writeLong("threadId", threadId)) + return false; + + writer.incrementState(); + + case 15: + if (!writer.writeLong("timeout", timeout)) + return false; + + writer.incrementState(); + + case 16: + if (!writer.writeMessage("topVer", topVer)) + return false; + + writer.incrementState(); + + case 17: + if (!writer.writeLong("txTimeout", txTimeout)) + return false; + + writer.incrementState(); + + case 18: + if (!writer.writeObjectArray("values", values, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + clientFirst = reader.readBoolean("clientFirst"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + filter = reader.readMessage("filter"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + futId = reader.readIgniteUuid("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + keys = reader.readObjectArray("keys", MessageCollectionItemType.MSG, KeyCacheObject.class); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + lockVer = reader.readMessage("lockVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + miniId = reader.readInt("miniId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + mvccSnapshot = reader.readMessage("mvccSnapshot"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + needRes = reader.readBoolean("needRes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 11: + byte opOrd; + + opOrd = reader.readByte("op"); + + if (!reader.isLastRead()) + return false; + + op = EnlistOperation.fromOrdinal(opOrd); + + reader.incrementState(); + + case 12: + subjId = reader.readUuid("subjId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 13: + taskNameHash = reader.readInt("taskNameHash"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 14: + threadId = reader.readLong("threadId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 15: + timeout = reader.readLong("timeout"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 16: + topVer = reader.readMessage("topVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 17: + txTimeout = reader.readLong("txTimeout"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 18: + values = reader.readObjectArray("values", MessageCollectionItemType.MSG, CacheObject.class); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridNearTxEnlistRequest.class); + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 19; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 159; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearTxEnlistRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f7f834bf/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java new file mode 100644 index 0000000..4f4bbb6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxEnlistResponse.java @@ -0,0 +1,372 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.distributed.near; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheIdMessage; +import org.apache.ignite.internal.processors.cache.GridCacheReturn; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.distributed.dht.ExceptionAware; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; +import org.jetbrains.annotations.Nullable; + +/** + * A response to {@link GridNearTxEnlistRequest}. + */ +public class GridNearTxEnlistResponse extends GridCacheIdMessage implements ExceptionAware { + /** */ + private static final long serialVersionUID = 0L; + + /** Future ID. */ + private IgniteUuid futId; + + /** Error. */ + @GridDirectTransient + private Throwable err; + + /** Serialized error. */ + private byte[] errBytes; + + /** Mini future id. */ + private int miniId; + + /** Result. */ + private GridCacheReturn res; + + /** */ + private GridCacheVersion lockVer; + + /** */ + private GridCacheVersion dhtVer; + + /** */ + private IgniteUuid dhtFutId; + + /** New DHT nodes involved into transaction. */ + @GridDirectCollection(UUID.class) + private Collection<UUID> newDhtNodes; + + /** + * Default constructor. + */ + public GridNearTxEnlistResponse() { + // No-op. + } + + /** + * Constructor for normal result. + * + * @param cacheId Cache id. + * @param futId Future id. + * @param miniId Mini future id. + * @param lockVer Lock version. + * @param res Result. + * @param dhtVer Dht version. + * @param dhtFutId Dht future id. + * @param newDhtNodes New DHT nodes involved into transaction. + */ + public GridNearTxEnlistResponse(int cacheId, + IgniteUuid futId, + int miniId, + GridCacheVersion lockVer, + GridCacheReturn res, + GridCacheVersion dhtVer, + IgniteUuid dhtFutId, + Set<UUID> newDhtNodes) { + this.cacheId = cacheId; + this.futId = futId; + this.miniId = miniId; + this.lockVer = lockVer; + this.res = res; + this.dhtVer = dhtVer; + this.dhtFutId = dhtFutId; + this.newDhtNodes = newDhtNodes; + } + + /** + * Constructor for error result. + * + * @param cacheId Cache id. + * @param futId Future id. + * @param miniId Mini future id. + * @param lockVer Lock version. + * @param err Error. + */ + public GridNearTxEnlistResponse(int cacheId, IgniteUuid futId, int miniId, GridCacheVersion lockVer, + Throwable err) { + this.cacheId = cacheId; + this.futId = futId; + this.miniId = miniId; + this.lockVer = lockVer; + this.err = err; + } + + /** + * @return Loc version. + */ + public GridCacheVersion version() { + return lockVer; + } + + /** + * @return Future id. + */ + public IgniteUuid futureId() { + return futId; + } + + /** + * @return Mini future id. + */ + public int miniId() { + return miniId; + } + + /** + * @return Result. + */ + public GridCacheReturn result() { + return res; + } + + /** {@inheritDoc} */ + @Nullable @Override public Throwable error() { + return err; + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return false; + } + + /** + * @return Dht version. + */ + public GridCacheVersion dhtVersion() { + return dhtVer; + } + + /** + * @return Dht future id. + */ + public IgniteUuid dhtFutureId() { + return dhtFutId; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 11; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeIgniteUuid("dhtFutId", dhtFutId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeMessage("dhtVer", dhtVer)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeByteArray("errBytes", errBytes)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeIgniteUuid("futId", futId)) + return false; + + writer.incrementState(); + + case 7: + if (!writer.writeMessage("lockVer", lockVer)) + return false; + + writer.incrementState(); + + case 8: + if (!writer.writeInt("miniId", miniId)) + return false; + + writer.incrementState(); + + case 9: + if (!writer.writeCollection("newDhtNodes", newDhtNodes, MessageCollectionItemType.UUID)) + return false; + + writer.incrementState(); + + case 10: + if (!writer.writeMessage("res", res)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + dhtFutId = reader.readIgniteUuid("dhtFutId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + dhtVer = reader.readMessage("dhtVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + errBytes = reader.readByteArray("errBytes"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + futId = reader.readIgniteUuid("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 7: + lockVer = reader.readMessage("lockVer"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 8: + miniId = reader.readInt("miniId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 9: + newDhtNodes = reader.readCollection("newDhtNodes", MessageCollectionItemType.UUID); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 10: + res = reader.readMessage("res"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(GridNearTxEnlistResponse.class); + } + + /** {@inheritDoc} */ + @Override public short directType() { + return 160; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + if (err != null && errBytes == null) + errBytes = U.marshal(ctx.marshaller(), err); + + if (res != null) + res.prepareMarshal(cctx); + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + GridCacheContext cctx = ctx.cacheContext(cacheId); + + if (errBytes != null) + err = U.unmarshal(ctx, errBytes, U.resolveClassLoader(ldr, ctx.gridConfig())); + + if (res != null) + res.finishUnmarshal(cctx, ldr); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridNearTxEnlistResponse.class, this); + } +}
