http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java new file mode 100644 index 0000000..f8130e1 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlock.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.CU; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Information about found deadlock. + */ +public class TxDeadlock { + /** Key prefix. */ + private static final String KEY_PREFIX = "K"; + + /** Tx prefix. */ + private static final String TX_PREFIX = "TX"; + + /** Tx locked keys. */ + private final Map<GridCacheVersion, Set<IgniteTxKey>> txLockedKeys; + + /** Tx requested keys. */ + private final Map<IgniteTxKey, Set<GridCacheVersion>> txRequestedKeys; + + /** Cycle. */ + private final List<GridCacheVersion> cycle; + + /** Transactions data: nearNodeId and threadId. */ + private final Map<GridCacheVersion, T2<UUID, Long>> txs; + + /** + * @param cycle Cycle. + * @param txs Transactions. + * @param txLockedKeys Tx locked keys. + * @param txRequestedKeys Tx requested keys. + */ + public TxDeadlock( + List<GridCacheVersion> cycle, + Map<GridCacheVersion, T2<UUID, Long>> txs, + Map<GridCacheVersion, Set<IgniteTxKey>> txLockedKeys, + Map<IgniteTxKey, Set<GridCacheVersion>> txRequestedKeys + ) { + this.cycle = cycle; + this.txLockedKeys = txLockedKeys; + this.txRequestedKeys = txRequestedKeys; + this.txs = txs; + } + + /** + * @return Deadlock represented as cycle of transaction in wait-for-graph. + */ + public List<GridCacheVersion> cycle() { + return cycle; + } + + /** + * @param ctx Context. + */ + public String toString(GridCacheSharedContext ctx) { + assert cycle != null && !cycle.isEmpty(); + + assert cycle.size() >= 3; // At least 2 transactions in cycle and the last is waiting for the first. + + Map<IgniteTxKey, String> keyLabels = U.newLinkedHashMap(cycle.size() - 1); + + Map<GridCacheVersion, String> txLabels = U.newLinkedHashMap(cycle.size() - 1); + + StringBuilder sb = new StringBuilder("\nDeadlock detected:\n\n"); + + for (int i = cycle.size() - 1; i > 0; i--) { + GridCacheVersion txId = cycle.get(i); + + Set<IgniteTxKey> keys = txLockedKeys.get(txId); + + for (IgniteTxKey key : keys) { + Set<GridCacheVersion> txIds = txRequestedKeys.get(key); + + if (txIds == null || txIds.isEmpty()) + continue; + + GridCacheVersion waitsTx = null; + + for (GridCacheVersion ver : txIds) { + if (cycle.contains(ver)) { + waitsTx = ver; + + break; + } + } + + if (waitsTx != null) { + sb.append(label(key, KEY_PREFIX, keyLabels)).append(": ") + .append(label(txId, TX_PREFIX, txLabels)).append(" holds lock, ") + .append(label(waitsTx, TX_PREFIX, txLabels)).append(" waits lock.\n"); + } + } + } + + sb.append("\nTransactions:\n\n"); + + for (Map.Entry<GridCacheVersion, String> e : txLabels.entrySet()) { + T2<UUID, Long> tx = txs.get(e.getKey()); + + sb.append(e.getValue()).append(" [txId=").append(e.getKey()) + .append(", nodeId=").append(tx.get1()).append(", threadId=").append(tx.get2()) + .append("]\n"); + } + + sb.append("\nKeys:\n\n"); + + for (Map.Entry<IgniteTxKey, String> e : keyLabels.entrySet()) { + IgniteTxKey txKey = e.getKey(); + + GridCacheContext cctx = ctx.cacheContext(txKey.cacheId()); + + Object val = CU.value(txKey.key(), cctx, true); + + sb.append(e.getValue()).append(" [key=").append(val).append(", cache=").append(cctx.namexx()).append("]\n"); + } + + return sb.toString(); + } + + /** + * @param id Id. + * @param prefix Prefix. + * @param map Map. + */ + private static <T> String label(T id, String prefix, Map<T, String> map) { + String lb = map.get(id); + + if (lb == null) + map.put(id, lb = prefix + (map.size() + 1)); + + return lb; + } +}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java new file mode 100644 index 0000000..36843dd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxDeadlockDetection.java @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.ignite.IgniteLogger; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; +import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter; +import org.apache.ignite.internal.util.future.GridFutureAdapter; +import org.apache.ignite.internal.util.typedef.T2; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.jetbrains.annotations.Nullable; + +import static org.apache.ignite.IgniteSystemProperties.IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT; +import static org.apache.ignite.IgniteSystemProperties.getInteger; +import static org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager.DEADLOCK_MAX_ITERS; + +/** + * Transactions deadlock detection. + */ +public class TxDeadlockDetection { + /** Deadlock detection maximum iterations. */ + private static final int DEADLOCK_TIMEOUT = getInteger(IGNITE_TX_DEADLOCK_DETECTION_TIMEOUT, 60000); + + /** Sequence. */ + private static final AtomicLong SEQ = new AtomicLong(); + + /** Cctx. */ + private final GridCacheSharedContext cctx; + + /** Logger. */ + private final IgniteLogger log; + + /** + * @param cctx Context. + */ + public TxDeadlockDetection(GridCacheSharedContext<?, ?> cctx) { + this.cctx = cctx; + this.log = cctx.logger(TxDeadlockDetection.class); + } + + /** + * Detects deadlock starting from given keys. + * + * @param tx Target tx. + * @param keys Keys. + * @return {@link TxDeadlock} if found, otherwise - {@code null}. + */ + TxDeadlockFuture detectDeadlock(IgniteInternalTx tx, Set<IgniteTxKey> keys) { + GridCacheVersion txId = tx.nearXidVersion(); + + if (log.isDebugEnabled()) { + log.debug("Deadlock detection started " + + "[nodeId=" + cctx.localNodeId() + ", xidVersion=" + txId + ", keys=" + keys + ']'); + } + + TxDeadlockFuture fut = new TxDeadlockFuture(cctx, txId, tx.topologyVersion(), keys); + + fut.init(); + + return fut; + } + + /** + * @param wfg Wait-for-graph. + * @param txId Tx ID - start vertex for cycle search in graph. + */ + static List<GridCacheVersion> findCycle(Map<GridCacheVersion, Set<GridCacheVersion>> wfg, GridCacheVersion txId) { + if (wfg == null || wfg.isEmpty()) + return null; + + ArrayDeque<GridCacheVersion> stack = new ArrayDeque<>(); + Set<GridCacheVersion> inPath = new HashSet<>(); + Set<GridCacheVersion> visited = new HashSet<>(); + Map<GridCacheVersion, GridCacheVersion> edgeTo = new HashMap<>(); + + stack.push(txId); + + while (!stack.isEmpty()) { + GridCacheVersion v = stack.pop(); + + if (visited.contains(v)) + continue; + + visited.add(v); + + Set<GridCacheVersion> children = wfg.get(v); + + if (children == null || children.isEmpty()) + continue; + + inPath.add(v); + + for (GridCacheVersion w : children) { + if (inPath.contains(w)) { + List<GridCacheVersion> cycle = new ArrayList<>(); + + for (GridCacheVersion x = v; !x.equals(w); x = edgeTo.get(x)) + cycle.add(x); + + cycle.add(w); + cycle.add(v); + + return cycle; + } + + edgeTo.put(w, v); + stack.push(w); + } + } + + return null; + } + + /** + * + */ + static class TxDeadlockFuture extends GridFutureAdapter<TxDeadlock> { + /** Context. */ + private final GridCacheSharedContext cctx; + + /** Future ID. */ + private final long futId = SEQ.incrementAndGet(); + + /** Tx ID. */ + private final GridCacheVersion txId; + + /** Keys. */ + private final Set<IgniteTxKey> keys; + + /** Processed keys. */ + private final Set<IgniteTxKey> processedKeys = new HashSet<>(); + + /** Processed nodes. */ + private final Set<UUID> processedNodes = new HashSet<>(); + + /** Pending keys. */ + private Map<UUID, Set<IgniteTxKey>> pendingKeys = new HashMap<>(); + + /** Nodes queue. */ + private final UniqueDeque<UUID> nodesQueue = new UniqueDeque<>(); + + /** Preferred nodes. */ + private final Set<UUID> preferredNodes = new HashSet<>(); + + /** Tx locked keys. */ + private final Map<GridCacheVersion, Set<IgniteTxKey>> txLockedKeys = new HashMap<>(); + + /** Tx requested keys. */ + private final Map<IgniteTxKey, Set<GridCacheVersion>> txRequestedKeys = new HashMap<>(); + + /** Wait-for-graph. */ + private final Map<GridCacheVersion, Set<GridCacheVersion>> wfg = new HashMap<>(); + + /** Topology version. */ + private final AffinityTopologyVersion topVer; + + /** Transactions. */ + private final Map<GridCacheVersion, T2<UUID, Long>> txs = new HashMap<>(); + + /** Current processing node ID. */ + private UUID curNodeId; + + /** Iterations count. */ + private int itersCnt; + + /** Timeout object. */ + private DeadlockTimeoutObject timeoutObj; + + /** Timed out flag. */ + private volatile boolean timedOut; + + /** Mutex. */ + private final Object mux = new Object(); + + /** + * @param cctx Context. + * @param txId Tx ID. + * @param topVer Transaction topology version. + * @param keys Keys. + */ + @SuppressWarnings("unchecked") + private TxDeadlockFuture(GridCacheSharedContext cctx, + GridCacheVersion txId, + AffinityTopologyVersion topVer, + Set<IgniteTxKey> keys) { + this.cctx = cctx; + this.txId = txId; + this.topVer = topVer; + this.keys = keys; + + if (DEADLOCK_TIMEOUT > 0) { + timeoutObj = new DeadlockTimeoutObject(); + + cctx.time().addTimeoutObject(timeoutObj); + } + } + + /** + * @return Future ID. + */ + long futureId() { + return futId; + } + + /** + * @param nodeId Node ID. + */ + public void onNodeLeft(UUID nodeId) { + if (compareAndSet(nodeId, null)) { + IgniteLogger log = cctx.logger(TxDeadlockDetection.class); + + if (log.isDebugEnabled()) + log.debug("Failed to finish deadlock detection, node left: " + nodeId); + + onDone(); + } + } + + /** */ + private void init() { + cctx.tm().addFuture(this); + + if (topVer == null) // Tx manager already stopped + onDone(); + + map(keys, Collections.<IgniteTxKey, TxLockList>emptyMap()); + } + + /** + * @param keys Keys. + * @param txLocks Tx locks. + */ + private void map(@Nullable Set<IgniteTxKey> keys, Map<IgniteTxKey, TxLockList> txLocks) { + mapTxKeys(keys, txLocks); + + UUID nodeId = nodesQueue.pollFirst(); + + boolean set = compareAndSet(null, nodeId); + + assert set; + + if (nodeId == null || itersCnt++ >= DEADLOCK_MAX_ITERS || timedOut) + onDone(); + else { + final Set<IgniteTxKey> txKeys = pendingKeys.get(nodeId); + + processedKeys.addAll(txKeys); + processedNodes.add(nodeId); + pendingKeys.remove(nodeId); + + cctx.tm().txLocksInfo(nodeId, this, txKeys); + } + } + + /** + * @param res Response. + */ + private void detect(TxLocksResponse res) { + assert res != null; + + merge(res); + + updateWaitForGraph(res.txLocks()); + + List<GridCacheVersion> cycle = findCycle(wfg, txId); + + if (cycle != null) + onDone(new TxDeadlock(cycle, txs, txLockedKeys, txRequestedKeys)); + else + map(res.keys(), res.txLocks()); + } + + /** + * Maps tx keys on nodes. Key can be mapped on some node if this node is primary for given key or + * node is near for transaction that holds or requests lock for key. + * + * Key will not be be mapped to node if both key and node are already handled. + * + * @param txKeys Tx keys. + * @param txLocks Tx locks. + */ + @SuppressWarnings("ForLoopReplaceableByForEach") + private void mapTxKeys(@Nullable Set<IgniteTxKey> txKeys, Map<IgniteTxKey, TxLockList> txLocks) { + for (Map.Entry<IgniteTxKey, TxLockList> e : txLocks.entrySet()) { + List<TxLock> locks = e.getValue().txLocks(); + + for (int i = 0; i < locks.size(); i++) { + TxLock txLock = locks.get(i); + + UUID nearNodeId = txLock.nearNodeId(); + + IgniteTxKey txKey = e.getKey(); + + if (processedKeys.contains(txKey) && processedNodes.contains(nearNodeId)) + continue; + + if (txLock.requested()) { + UUID nodeId = primary(txKey); + + // Process this node earlier than other in order to optimize amount of requests. + preferredNodes.add(nodeId); + + Set<IgniteTxKey> mappedKeys = pendingKeys.get(nodeId); + + if (mappedKeys == null) + pendingKeys.put(nodeId, mappedKeys = new HashSet<>()); + + mappedKeys.add(txKey); + } + else { + if (txLock.owner()) { + if (!preferredNodes.contains(nearNodeId)) + nodesQueue.addFirst(nearNodeId); + } + else + nodesQueue.addLast(nearNodeId); + + Set<IgniteTxKey> mappedKeys = pendingKeys.get(nearNodeId); + + if (mappedKeys == null) + pendingKeys.put(nearNodeId, mappedKeys = new HashSet<>()); + + mappedKeys.add(txKey); + } + } + } + + for (UUID nodeId : preferredNodes) + nodesQueue.addFirst(nodeId); + + preferredNodes.clear(); + + if (txKeys != null) { + for (IgniteTxKey txKey : txKeys) { + UUID nodeId = primary(txKey); + + if (processedKeys.contains(txKey) && processedNodes.contains(nodeId)) + continue; + + nodesQueue.addLast(nodeId); + + Set<IgniteTxKey> mappedKeys = pendingKeys.get(nodeId); + + if (mappedKeys == null) + pendingKeys.put(nodeId, mappedKeys = new HashSet<>()); + + mappedKeys.add(txKey); + } + } + } + + /** + * @param txKey Tx key. + * @return Primary node ID. + */ + private UUID primary(IgniteTxKey txKey) { + GridCacheContext ctx = cctx.cacheContext(txKey.cacheId()); + + ClusterNode node = ctx.affinity().primary(txKey.key(), topVer); + + assert node != null : topVer; + + return node.id(); + } + + /** + * @param res Tx locks. + */ + private void merge(TxLocksResponse res) { + Map<IgniteTxKey, TxLockList> txLocks = res.txLocks(); + + if (txLocks == null || txLocks.isEmpty()) + return; + + for (Map.Entry<IgniteTxKey, TxLockList> e : txLocks.entrySet()) { + IgniteTxKey txKey = e.getKey(); + + TxLockList lockList = e.getValue(); + + if (lockList != null && !lockList.isEmpty()) { + for (TxLock lock : lockList.txLocks()) { + if (lock.owner() || lock.candiate()) { + if (txs.get(lock.txId()) == null) + txs.put(lock.txId(), new T2<>(lock.nearNodeId(), lock.threadId())); + } + + if (lock.owner()) { + GridCacheVersion txId = lock.txId(); + + Set<IgniteTxKey> keys = txLockedKeys.get(txId); + + if (keys == null) + txLockedKeys.put(txId, keys = new HashSet<>()); + + keys.add(txKey); + } + else if (lock.candiate()) { + Set<GridCacheVersion> txs = txRequestedKeys.get(txKey); + + if (txs == null) + txRequestedKeys.put(txKey, txs = new HashSet<>()); + + txs.add(lock.txId()); + } + } + } + } + } + + /** + * @param txLocks Tx locks. + */ + private void updateWaitForGraph(Map<IgniteTxKey, TxLockList> txLocks) { + for (Map.Entry<IgniteTxKey, TxLockList> e : txLocks.entrySet()) { + + GridCacheVersion txOwner = null; + + for (TxLock lock : e.getValue().txLocks()) { + if (lock.owner()) { + assert txOwner == null; + + txOwner = lock.txId(); + + if (keys.contains(e.getKey()) && !txId.equals(lock.txId())) { + Set<GridCacheVersion> waitingTxs = wfg.get(txId); + + if (waitingTxs == null) + wfg.put(txId, waitingTxs = new HashSet<>()); + + waitingTxs.add(lock.txId()); + } + + continue; + } + + if (lock.candiate()) { + GridCacheVersion txId0 = lock.txId(); + + Set<GridCacheVersion> waitForTxs = wfg.get(txId0); + + if (waitForTxs == null) + wfg.put(txId0, waitForTxs = new HashSet<>()); + + waitForTxs.add(txOwner); + } + } + } + } + + /** + * @param res Response. + */ + public void onResult(UUID nodeId, TxLocksResponse res) { + boolean set = compareAndSet(nodeId, null); + + if (res != null && set) { + if (res.classError() != null) { + IgniteLogger log = cctx.logger(TxDeadlockDetection.class); + + log.warning("Failed to finish deadlock detection due to an error: " + nodeId); + + onDone(); + } + else + detect(res); + } + else + onDone(); + } + + /** + * @param exp Expected. + * @param val Value. + */ + private boolean compareAndSet(UUID exp, UUID val) { + synchronized (mux) { + if (Objects.equals(curNodeId, exp)) { + curNodeId = val; + + return true; + } + } + + return false; + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable TxDeadlock res, @Nullable Throwable err) { + if (super.onDone(res, err)) { + cctx.tm().removeFuture(futId); + + if (timeoutObj != null) + cctx.time().removeTimeoutObject(timeoutObj); + + return true; + } + + return false; + } + + /** + * Lock request timeout object. + */ + private class DeadlockTimeoutObject extends GridTimeoutObjectAdapter { + /** + * Default constructor. + */ + DeadlockTimeoutObject() { + super(DEADLOCK_TIMEOUT); + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + timedOut = true; + + onDone(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(DeadlockTimeoutObject.class, this); + } + } + } + + /** + * Deque with Set semantic. + * Only overridden methods can be used. + */ + private static class UniqueDeque<E> extends ArrayDeque<E> { + /** Serial version UID. */ + private static final long serialVersionUID = 0L; + + /** Items. */ + private final Set<E> items = new HashSet<>(); + + /** {@inheritDoc} */ + @Override public void addFirst(E e) { + boolean contains, first = false; + + if ((contains = items.contains(e)) && !(first = getFirst().equals(e))) + remove(e); + + if (!contains) + items.add(e); + + if (!first) + super.addFirst(e); + } + + /** {@inheritDoc} */ + @Override public void addLast(E e) { + if (!items.contains(e)) { + super.addLast(e); + + items.add(e); + } + } + + /** {@inheritDoc} */ + @Override public E pollFirst() { + E e = super.pollFirst(); + + items.remove(e); + + return e; + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLock.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLock.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLock.java new file mode 100644 index 0000000..af85735 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLock.java @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.nio.ByteBuffer; +import java.util.UUID; +import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Corresponds to one {@link GridCacheMvccCandidate} from local MVCC candidates queue. + * There is one exclusion: {@link TxLock} instance with {@link #OWNERSHIP_REQUESTED} corresponds to lock request + * to remote node from near node that isn't primary node for key. + */ +public class TxLock implements Message { + /** Serial version UID. */ + private static final long serialVersionUID = 0L; + + /** Ownership owner. */ + static final byte OWNERSHIP_OWNER = 1; + + /** Ownership candidate. */ + static final byte OWNERSHIP_CANDIDATE = 2; + + /** Ownership requested. */ + static final byte OWNERSHIP_REQUESTED = 3; + + /** Near node ID. */ + private UUID nearNodeId; + + /** Tx ID. */ + private GridCacheVersion txId; + + /** Thread ID. */ + private long threadId; + + /** Ownership. */ + private byte ownership; + + /** + * Default constructor. + */ + public TxLock() { + // No-op. + } + + /** + * @param txId Tx ID. + * @param nearNodeId Near node ID. + * @param threadId Thread ID. + * @param ownership Ownership. + */ + public TxLock(GridCacheVersion txId, UUID nearNodeId, long threadId, byte ownership) { + this.txId = txId; + this.nearNodeId = nearNodeId; + this.threadId = threadId; + this.ownership = ownership; + } + + /** + * @return Near node ID. + */ + public UUID nearNodeId() { + return nearNodeId; + } + + /** + * @return Transaction ID. + */ + public GridCacheVersion txId() { + return txId; + } + + /** + * @return Thread ID. + */ + public long threadId() { + return threadId; + } + + /** + * @return {@code True} if transaction hold lock on the key, otherwise {@code false}. + */ + public boolean owner() { + return ownership == OWNERSHIP_OWNER; + } + + /** + * @return {@code True} if there is MVCC candidate for this transaction and key, otherwise {@code false}. + */ + public boolean candiate() { + return ownership == OWNERSHIP_CANDIDATE; + } + + /** + * @return {@code True} if transaction requested lock for key from primary remote node + * but response isn't received because other transaction hold lock on the key. + */ + public boolean requested() { + return ownership == OWNERSHIP_REQUESTED; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TxLock.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeUuid("nearNodeId", nearNodeId)) + return false; + + writer.incrementState(); + + case 1: + if (!writer.writeByte("ownership", ownership)) + return false; + + writer.incrementState(); + + case 2: + if (!writer.writeLong("threadId", threadId)) + return false; + + writer.incrementState(); + + case 3: + if (!writer.writeMessage("txId", txId)) + return false; + + writer.incrementState(); + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + nearNodeId = reader.readUuid("nearNodeId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 1: + ownership = reader.readByte("ownership"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 2: + threadId = reader.readLong("threadId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 3: + txId = reader.readMessage("txId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + } + + return reader.afterMessageRead(TxLock.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -25; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 4; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLockList.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLockList.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLockList.java new file mode 100644 index 0000000..8b1c2dd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLockList.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.internal.GridDirectCollection; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * List of transaction locks for particular key. + */ +public class TxLockList implements Message { + /** Serial version UID. */ + private static final long serialVersionUID = 0L; + + /** Tx locks. */ + @GridToStringInclude + @GridDirectCollection(value = TxLock.class) + private List<TxLock> txLocks = new ArrayList<>(); + + /** + * Default constructor. + */ + public TxLockList() { + // No-op. + } + + /** + * @return Lock list. + */ + public List<TxLock> txLocks() { + return txLocks; + } + + /** + * @param txLock Tx lock. + */ + public void add(TxLock txLock) { + txLocks.add(txLock); + } + + /** + * @return {@code True} if lock list is empty. + */ + public boolean isEmpty() { + return txLocks.isEmpty(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TxLockList.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 0: + if (!writer.writeCollection("txLocks", txLocks, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + switch (reader.state()) { + case 0: + txLocks = reader.readCollection("txLocks", MessageCollectionItemType.MSG); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(TxLockList.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -26; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 1; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java new file mode 100644 index 0000000..20afcc6 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksRequest.java @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.A; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Transactions lock list request. + */ +public class TxLocksRequest extends GridCacheMessage { + /** Serial version UID. */ + private static final long serialVersionUID = 0L; + + /** Future ID. */ + private long futId; + + /** Tx keys. */ + @GridToStringInclude + @GridDirectTransient + private Set<IgniteTxKey> txKeys; + + /** Array of txKeys from {@link #txKeys}. Used during marshalling and unmarshalling. */ + @GridToStringExclude + private IgniteTxKey[] txKeysArr; + + /** + * Default constructor. + */ + public TxLocksRequest() { + // No-op. + } + + /** + * @param futId Future ID. + * @param txKeys Target tx keys. + */ + public TxLocksRequest(long futId, Set<IgniteTxKey> txKeys) { + A.notEmpty(txKeys, "txKeys"); + + this.futId = futId; + this.txKeys = txKeys; + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** + * @return Tx keys. + */ + public Collection<IgniteTxKey> txKeys() { + return txKeys; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TxLocksRequest.class, this); + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + txKeysArr = new IgniteTxKey[txKeys.size()]; + + int i = 0; + + for (IgniteTxKey key : txKeys) { + key.prepareMarshal(ctx.cacheContext(key.cacheId())); + + txKeysArr[i++] = key; + } + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + txKeys = U.newHashSet(txKeysArr.length); + + for (IgniteTxKey key : txKeysArr) { + key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr); + + txKeys.add(key); + } + + txKeysArr = null; + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeObjectArray("txKeysArr", txKeysArr, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + txKeysArr = reader.readObjectArray("txKeysArr", MessageCollectionItemType.MSG, IgniteTxKey.class); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(TxLocksRequest.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -24; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 5; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java new file mode 100644 index 0000000..fa6afdd --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/transactions/TxLocksResponse.java @@ -0,0 +1,318 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.GridDirectTransient; +import org.apache.ignite.internal.processors.cache.GridCacheMessage; +import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; +import org.apache.ignite.internal.util.tostring.GridToStringExclude; +import org.apache.ignite.internal.util.tostring.GridToStringInclude; +import org.apache.ignite.internal.util.typedef.internal.S; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType; +import org.apache.ignite.plugin.extensions.communication.MessageReader; +import org.apache.ignite.plugin.extensions.communication.MessageWriter; + +/** + * Transactions lock list response. + */ +public class TxLocksResponse extends GridCacheMessage { + /** Serial version UID. */ + private static final long serialVersionUID = 0L; + + /** Future ID. */ + private long futId; + + /** Locks for near txKeys of near transactions. */ + @GridToStringInclude + @GridDirectTransient + private Map<IgniteTxKey, TxLockList> nearTxKeyLocks = new HashMap<>(); + + /** Remote keys involved into transactions. Doesn't include near keys. */ + @GridToStringInclude + @GridDirectTransient + private Set<IgniteTxKey> txKeys; + + /** Array of txKeys from {@link #nearTxKeyLocks}. Used during marshalling and unmarshalling. */ + @GridToStringExclude + private IgniteTxKey[] nearTxKeysArr; + + /** Array of txKeys from {@link #txKeys}. Used during marshalling and unmarshalling. */ + @GridToStringExclude + private IgniteTxKey[] txKeysArr; + + /** Array of locksArr from {@link #nearTxKeyLocks}. Used during marshalling and unmarshalling. */ + @GridToStringExclude + private TxLockList[] locksArr; + + /** + * Default constructor. + */ + public TxLocksResponse() { + // No-op. + } + + /** + * @return Future ID. + */ + public long futureId() { + return futId; + } + + /** + * @param futId Future ID. + */ + public void futureId(long futId) { + this.futId = futId; + } + + /** + * @return Lock lists for all tx nearTxKeysArr. + */ + public Map<IgniteTxKey, TxLockList> txLocks() { + return nearTxKeyLocks; + } + + /** + * @param txKey Tx key. + * @return Lock list for given tx key. + */ + public TxLockList txLocks(IgniteTxKey txKey) { + return nearTxKeyLocks.get(txKey); + } + + /** + * @param txKey Tx key. + * @param txLock Tx lock. + */ + public void addTxLock(IgniteTxKey txKey, TxLock txLock) { + TxLockList lockList = nearTxKeyLocks.get(txKey); + + if (lockList == null) + nearTxKeyLocks.put(txKey, lockList = new TxLockList()); + + lockList.add(txLock); + } + + /** + * @return Remote txKeys involved into tx. + */ + public Set<IgniteTxKey> keys() { + return txKeys; + } + + /** + * @param key Key. + */ + public void addKey(IgniteTxKey key) { + if (txKeys == null) + txKeys = new HashSet<>(); + + txKeys.add(key); + } + + /** {@inheritDoc} */ + @Override public boolean addDeploymentInfo() { + return addDepInfo; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(TxLocksResponse.class, this); + } + + /** {@inheritDoc} */ + @Override public void prepareMarshal(GridCacheSharedContext ctx) throws IgniteCheckedException { + super.prepareMarshal(ctx); + + if (nearTxKeyLocks != null && !nearTxKeyLocks.isEmpty()) { + int len = nearTxKeyLocks.size(); + + nearTxKeysArr = new IgniteTxKey[len]; + locksArr = new TxLockList[len]; + + int i = 0; + + for (Map.Entry<IgniteTxKey, TxLockList> entry : nearTxKeyLocks.entrySet()) { + IgniteTxKey key = entry.getKey(); + + key.prepareMarshal(ctx.cacheContext(key.cacheId())); + + nearTxKeysArr[i] = key; + locksArr[i] = entry.getValue(); + + i++; + } + } + + if (txKeys != null && !txKeys.isEmpty()) { + txKeysArr = new IgniteTxKey[txKeys.size()]; + + int i = 0; + + for (IgniteTxKey key : txKeys) { + key.prepareMarshal(ctx.cacheContext(key.cacheId())); + + txKeysArr[i++] = key; + } + } + } + + /** {@inheritDoc} */ + @Override public void finishUnmarshal(GridCacheSharedContext ctx, ClassLoader ldr) throws IgniteCheckedException { + super.finishUnmarshal(ctx, ldr); + + if (nearTxKeysArr != null) { + for (int i = 0; i < nearTxKeysArr.length; i++) { + IgniteTxKey key = nearTxKeysArr[i]; + + key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr); + + txLocks().put(key, locksArr[i]); + } + + nearTxKeysArr = null; + locksArr = null; + } + + if (txKeysArr != null) { + txKeys = U.newHashSet(txKeysArr.length); + + for (IgniteTxKey key : txKeysArr) { + key.finishUnmarshal(ctx.cacheContext(key.cacheId()), ldr); + + txKeys.add(key); + } + + txKeysArr = null; + } + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) { + writer.setBuffer(buf); + + if (!super.writeTo(buf, writer)) + return false; + + if (!writer.isHeaderWritten()) { + if (!writer.writeHeader(directType(), fieldsCount())) + return false; + + writer.onHeaderWritten(); + } + + switch (writer.state()) { + case 3: + if (!writer.writeLong("futId", futId)) + return false; + + writer.incrementState(); + + case 4: + if (!writer.writeObjectArray("locksArr", locksArr, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 5: + if (!writer.writeObjectArray("nearTxKeysArr", nearTxKeysArr, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + case 6: + if (!writer.writeObjectArray("txKeysArr", txKeysArr, MessageCollectionItemType.MSG)) + return false; + + writer.incrementState(); + + } + + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) { + reader.setBuffer(buf); + + if (!reader.beforeMessageRead()) + return false; + + if (!super.readFrom(buf, reader)) + return false; + + switch (reader.state()) { + case 3: + futId = reader.readLong("futId"); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 4: + locksArr = reader.readObjectArray("locksArr", MessageCollectionItemType.MSG, TxLockList.class); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 5: + nearTxKeysArr = reader.readObjectArray("nearTxKeysArr", MessageCollectionItemType.MSG, IgniteTxKey.class); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + case 6: + txKeysArr = reader.readObjectArray("txKeysArr", MessageCollectionItemType.MSG, IgniteTxKey.class); + + if (!reader.isLastRead()) + return false; + + reader.incrementState(); + + } + + return reader.afterMessageRead(TxLocksResponse.class); + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return -23; + } + + /** {@inheritDoc} */ + @Override public byte fieldsCount() { + return 7; + } + + /** {@inheritDoc} */ + @Override public void onAckReceived() { + // No-op. + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java index 6d9f574..ab2ba9d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java @@ -215,6 +215,7 @@ import org.apache.ignite.spi.IgniteSpi; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.discovery.DiscoverySpi; import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport; +import org.apache.ignite.transactions.TransactionDeadlockException; import org.apache.ignite.transactions.TransactionHeuristicException; import org.apache.ignite.transactions.TransactionOptimisticException; import org.apache.ignite.transactions.TransactionRollbackException; @@ -806,6 +807,9 @@ public abstract class IgniteUtils { m.put(IgniteTxTimeoutCheckedException.class, new C1<IgniteCheckedException, IgniteException>() { @Override public IgniteException apply(IgniteCheckedException e) { + if (e.getCause() instanceof TransactionDeadlockException) + return new TransactionTimeoutException(e.getMessage(), e.getCause()); + return new TransactionTimeoutException(e.getMessage(), e); } }); http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java index 6c4e894..e2e7100 100644 --- a/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java +++ b/modules/core/src/main/java/org/apache/ignite/transactions/Transaction.java @@ -25,7 +25,7 @@ import org.apache.ignite.lang.IgniteAsyncSupported; import org.apache.ignite.lang.IgniteUuid; /** - * Grid cache transaction. Cache transactions have a default 2PC (two-phase-commit) behavior and + * Ignite cache transaction. Cache transactions have a default 2PC (two-phase-commit) behavior and * can be plugged into ongoing {@code JTA} transaction by properly implementing * {@ignitelink org.apache.ignite.cache.jta.CacheTmLookup} * interface. Cache transactions can also be started explicitly directly from {@link IgniteTransactions} API @@ -96,17 +96,19 @@ import org.apache.ignite.lang.IgniteUuid; * <h1 class="header">Usage</h1> * You can use cache transactions as follows: * <pre name="code" class="java"> - * Cache<String, Integer> cache = Ignition.ignite().cache(); + * Ignite ignite = Ignition.ignite(); * - * try (GridCacheTx tx = cache.txStart()) { + * IgniteCache<String, Integer> cache = ignite.cache(cacheName); + * + * try (Transaction tx = ignite.transactions().txStart()) { * // Perform transactional operations. * Integer v1 = cache.get("k1"); * * // Check if v1 satisfies some condition before doing a put. * if (v1 != null && v1 > 0) - * cache.putx("k1", 2); + * cache.put("k1", 2); * - * cache.removex("k2"); + * cache.remove("k2"); * * // Commit the transaction. * tx.commit(); @@ -188,7 +190,7 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport { /** * Gets timeout value in milliseconds for this transaction. If transaction times - * out prior to it's completion, {@link org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException} will be thrown. + * out prior to it's completion, {@link org.apache.ignite.transactions.TransactionTimeoutException} will be thrown. * * @return Transaction timeout value. */ @@ -225,6 +227,11 @@ public interface Transaction extends AutoCloseable, IgniteAsyncSupport { * Commits this transaction by initiating {@code two-phase-commit} process. * * @throws IgniteException If commit failed. + * @throws TransactionTimeoutException If transaction is timed out. + * @throws TransactionRollbackException If transaction is automatically rolled back. + * @throws TransactionOptimisticException If transaction concurrency is {@link TransactionConcurrency#OPTIMISTIC} + * and commit is optimistically failed. + * @throws TransactionHeuristicException If transaction has entered an unknown state. */ @IgniteAsyncSupported public void commit() throws IgniteException; http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/transactions/TransactionDeadlockException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionDeadlockException.java b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionDeadlockException.java new file mode 100644 index 0000000..4dc13fe --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionDeadlockException.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.transactions; + +import org.apache.ignite.IgniteException; + +/** + * Transaction deadlock exception. + * <p> + * This exception can be thrown from any cache method that modifies or reads data within transaction + * (explicit or implicit) with timeout in case when deadlock detection is enabled (enabled by default). + * <p> + * Usually this exception is cause for {@link TransactionTimeoutException}. + */ +public class TransactionDeadlockException extends IgniteException { + /** Serial version UID. */ + private static final long serialVersionUID = 0L; + + /** + * Creates new deadlock exception with given error message. + * + * @param msg Error message. + */ + public TransactionDeadlockException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/main/java/org/apache/ignite/transactions/TransactionTimeoutException.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionTimeoutException.java b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionTimeoutException.java index 970672e..ab76cf7 100644 --- a/modules/core/src/main/java/org/apache/ignite/transactions/TransactionTimeoutException.java +++ b/modules/core/src/main/java/org/apache/ignite/transactions/TransactionTimeoutException.java @@ -20,7 +20,8 @@ package org.apache.ignite.transactions; import org.apache.ignite.IgniteException; /** - * Exception thrown whenever grid transactions time out. + * Exception thrown whenever transactions time out. Because transaction can be timed out due to a deadlock + * this exception can contain {@link TransactionDeadlockException} as cause. */ public class TransactionTimeoutException extends IgniteException { /** */ @@ -39,7 +40,7 @@ public class TransactionTimeoutException extends IgniteException { * Creates new timeout exception with given error message and optional nested exception. * * @param msg Error message. - * @param cause Optional nested exception (can be <tt>null</tt>). + * @param cause Optional nested exception (can be {@code null}). */ public TransactionTimeoutException(String msg, Throwable cause) { super(msg, cause); http://git-wip-us.apache.org/repos/asf/ignite/blob/8f7e6504/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java new file mode 100644 index 0000000..20467c2 --- /dev/null +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/transactions/DepthFirstSearchTest.java @@ -0,0 +1,252 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.cache.transactions; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import junit.framework.TestCase; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.typedef.F; + +import static org.apache.ignite.internal.processors.cache.transactions.TxDeadlockDetection.findCycle; + +/** + * DFS test for search cycle in wait-for-graph. + */ +public class DepthFirstSearchTest extends TestCase { + /** Tx 1. */ + private static final GridCacheVersion T1 = new GridCacheVersion(1, 0, 0, 0); + + /** Tx 2. */ + private static final GridCacheVersion T2 = new GridCacheVersion(2, 0, 0, 0); + + /** Tx 3. */ + private static final GridCacheVersion T3 = new GridCacheVersion(3, 0, 0, 0); + + /** Tx 4. */ + private static final GridCacheVersion T4 = new GridCacheVersion(4, 0, 0, 0); + + /** Tx 5. */ + private static final GridCacheVersion T5 = new GridCacheVersion(5, 0, 0, 0); + + /** Tx 6. */ + private static final GridCacheVersion T6 = new GridCacheVersion(6, 0, 0, 0); + + /** All transactions. */ + private static final List<GridCacheVersion> ALL = Arrays.asList(T1, T2, T3, T4, T5, T6); + + + /** + * @throws Exception If failed. + */ + public void testNoCycle() throws Exception { + assertNull(findCycle(Collections.<GridCacheVersion, Set<GridCacheVersion>>emptyMap(), T1)); + + HashMap<GridCacheVersion, Set<GridCacheVersion>> wfg; + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, null); + }}; + + assertAllNull(wfg); + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, null); + put(T2, null); + }}; + + assertAllNull(wfg); + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, Collections.singleton(T2)); + put(T3, Collections.singleton(T4)); + }}; + + assertAllNull(wfg); + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, new HashSet<GridCacheVersion>(){{add(T2);}}); + put(T2, new HashSet<GridCacheVersion>(){{add(T3);}}); + put(T4, new HashSet<GridCacheVersion>(){{add(T1); add(T3);}}); + }}; + + assertAllNull(wfg); + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, new HashSet<GridCacheVersion>(){{add(T2);}}); + put(T3, new HashSet<GridCacheVersion>(){{add(T4);}}); + put(T4, new HashSet<GridCacheVersion>(){{add(T1);}}); + }}; + + assertAllNull(wfg); + } + + /** + * @throws Exception If failed. + */ + public void testFindCycle2() throws Exception { + Map<GridCacheVersion, Set<GridCacheVersion>> wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, Collections.singleton(T2)); + put(T2, Collections.singleton(T1)); + }}; + + assertEquals(F.asList(T2, T1, T2), findCycle(wfg, T1)); + assertEquals(F.asList(T1, T2, T1), findCycle(wfg, T2)); + assertAllNull(wfg, T1, T2); + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, Collections.singleton(T2)); + put(T2, Collections.singleton(T3)); + put(T3, asLinkedHashSet(T2, T4)); + }}; + + assertEquals(F.asList(T3, T2, T3), findCycle(wfg, T1)); + assertEquals(F.asList(T3, T2, T3), findCycle(wfg, T2)); + assertEquals(F.asList(T2, T3, T2), findCycle(wfg, T3)); + assertAllNull(wfg, T1, T2, T3); + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, Collections.singleton(T2)); + put(T2, asLinkedHashSet(T3, T1)); + put(T3, Collections.singleton(T2)); + }}; + + assertEquals(F.asList(T2, T1, T2), findCycle(wfg, T1)); + assertEquals(F.asList(T1, T2, T1), findCycle(wfg, T2)); + assertEquals(F.asList(T2, T3, T2), findCycle(wfg, T3)); + assertAllNull(wfg, T1, T2, T3); + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, Collections.singleton(T2)); + put(T2, asLinkedHashSet(T1, T3)); + put(T3, Collections.singleton(T4)); + put(T4, Collections.singleton(T3)); + }}; + + assertEquals(F.asList(T2, T1, T2), findCycle(wfg, T1)); + assertEquals(F.asList(T4, T3, T4), findCycle(wfg, T2)); + assertEquals(F.asList(T4, T3, T4), findCycle(wfg, T3)); + assertEquals(F.asList(T3, T4, T3), findCycle(wfg, T4)); + assertAllNull(wfg, T1, T2, T3, T4); + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, Collections.singleton(T2)); + put(T2, Collections.singleton(T3)); + put(T3, Collections.singleton(T4)); + put(T4, Collections.singleton(T5)); + put(T5, Collections.singleton(T6)); + put(T6, Collections.singleton(T5)); + }}; + + assertEquals(F.asList(T6, T5, T6), findCycle(wfg, T1)); + assertEquals(F.asList(T6, T5, T6), findCycle(wfg, T2)); + assertEquals(F.asList(T6, T5, T6), findCycle(wfg, T3)); + assertEquals(F.asList(T6, T5, T6), findCycle(wfg, T4)); + assertEquals(F.asList(T6, T5, T6), findCycle(wfg, T5)); + assertEquals(F.asList(T5, T6, T5), findCycle(wfg, T6)); + } + + /** + * @throws Exception If failed. + */ + public void testFindCycle3() throws Exception { + Map<GridCacheVersion, Set<GridCacheVersion>> wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, Collections.singleton(T2)); + put(T2, Collections.singleton(T3)); + put(T3, Collections.singleton(T1)); + }}; + + assertEquals(F.asList(T3, T2, T1, T3), findCycle(wfg, T1)); + assertEquals(F.asList(T1, T3, T2, T1), findCycle(wfg, T2)); + assertEquals(F.asList(T2, T1, T3, T2), findCycle(wfg, T3)); + assertAllNull(wfg, T1, T2, T3); + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, Collections.singleton(T2)); + put(T2, Collections.singleton(T3)); + put(T3, Collections.singleton(T4)); + put(T4, asLinkedHashSet(T2, T5)); + }}; + + assertEquals(F.asList(T4, T3, T2, T4), findCycle(wfg, T1)); + assertEquals(F.asList(T4, T3, T2, T4), findCycle(wfg, T2)); + assertEquals(F.asList(T2, T4, T3, T2), findCycle(wfg, T3)); + assertEquals(F.asList(T3, T2, T4, T3), findCycle(wfg, T4)); + assertAllNull(wfg, T1, T2, T3, T4); + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, Collections.singleton(T2)); + put(T2, asLinkedHashSet(T3, T4)); + put(T3, Collections.singleton(T1)); + put(T4, Collections.singleton(T5)); + put(T5, Collections.singleton(T6)); + put(T6, Collections.singleton(T4)); + + }}; + + assertEquals(F.asList(T6, T5, T4, T6), findCycle(wfg, T1)); + assertEquals(F.asList(T6, T5, T4, T6), findCycle(wfg, T2)); + assertEquals(F.asList(T2, T1, T3, T2), findCycle(wfg, T3)); + + wfg = new HashMap<GridCacheVersion, Set<GridCacheVersion>>() {{ + put(T1, Collections.singleton(T2)); + put(T2, Collections.singleton(T3)); + put(T3, Collections.singleton(T4)); + put(T4, Collections.singleton(T5)); + put(T5, Collections.singleton(T6)); + put(T6, Collections.singleton(T4)); + }}; + + assertEquals(F.asList(T6, T5, T4, T6), findCycle(wfg, T1)); + assertEquals(F.asList(T6, T5, T4, T6), findCycle(wfg, T2)); + assertEquals(F.asList(T6, T5, T4, T6), findCycle(wfg, T3)); + assertEquals(F.asList(T6, T5, T4, T6), findCycle(wfg, T4)); + assertEquals(F.asList(T4, T6, T5, T4), findCycle(wfg, T5)); + assertEquals(F.asList(T5, T4, T6, T5), findCycle(wfg, T6)); + + } + + /** + * @param wfg Wait-for-graph. + */ + private static void assertAllNull(Map<GridCacheVersion, Set<GridCacheVersion>> wfg, GridCacheVersion... ignore) { + Set<GridCacheVersion> excl = F.asSet(ignore); + + for (GridCacheVersion tx : ALL) { + if (!excl.contains(tx)) + assertNull(tx + " could not be part of cycle", findCycle(wfg, tx)); + } + } + + /** + * @param txs Txs. + */ + private static Set<GridCacheVersion> asLinkedHashSet(GridCacheVersion... txs) { + Set<GridCacheVersion> set = new LinkedHashSet<>(); + + Collections.addAll(set, txs); + + return set; + } +} \ No newline at end of file
