Repository: ignite Updated Branches: refs/heads/ignite-8446 7f89fa52f -> cc8591a1a
IGNITE-8808 Improve control.sh --tx command to show local and remote transactions. - Fixes #4209. Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d305c663 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d305c663 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d305c663 Branch: refs/heads/ignite-8446 Commit: d305c6634032c152fd2432a31f465e0083c715a2 Parents: cf09e76 Author: Aleksei Scherbakov <alexey.scherbak...@gmail.com> Authored: Tue Jun 19 20:12:13 2018 +0300 Committer: Alexey Goncharuk <alexey.goncha...@gmail.com> Committed: Tue Jun 19 20:12:13 2018 +0300 ---------------------------------------------------------------------- .../internal/commandline/CommandHandler.java | 18 +- .../ignite/internal/visor/tx/VisorTxInfo.java | 41 ++- .../ignite/internal/visor/tx/VisorTxTask.java | 212 +++++++++++-- .../internal/TestRecordingCommunicationSpi.java | 42 ++- .../ignite/util/GridCommandHandlerTest.java | 303 ++++++++++++++++++- 5 files changed, 559 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/d305c663/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java index ed85f0c..cf05699 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java @@ -1088,11 +1088,19 @@ public class CommandHandler { ", concurrency=" + info.getConcurrency() + ", timeout=" + info.getTimeout() + ", size=" + info.getSize() + - ", dhtNodes=" + F.transform(info.getPrimaryNodes(), new IgniteClosure<UUID, String>() { - @Override public String apply(UUID id) { - return U.id8(id); - } - }) + + ", dhtNodes=" + (info.getPrimaryNodes() == null ? "N/A" : + F.transform(info.getPrimaryNodes(), new IgniteClosure<UUID, String>() { + @Override public String apply(UUID id) { + return U.id8(id); + } + })) + + ", nearXid=" + info.getNearXid() + + ", parentNodeIds=" + (info.getMasterNodeIds() == null ? "N/A" : + F.transform(info.getMasterNodeIds(), new IgniteClosure<UUID, String>() { + @Override public String apply(UUID id) { + return U.id8(id); + } + })) + ']'); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d305c663/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java index ecf3e0d..03de5b0 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java @@ -33,6 +33,7 @@ import org.apache.ignite.lang.IgniteUuid; import org.apache.ignite.transactions.TransactionConcurrency; import org.apache.ignite.transactions.TransactionIsolation; import org.apache.ignite.transactions.TransactionState; +import org.jetbrains.annotations.Nullable; /** */ @@ -75,6 +76,12 @@ public class VisorTxInfo extends VisorDataTransferObject { /** */ private int size; + /** */ + private IgniteUuid nearXid; + + /** */ + private Collection<UUID> masterNodeIds; + /** * Default constructor. */ @@ -96,7 +103,7 @@ public class VisorTxInfo extends VisorDataTransferObject { */ public VisorTxInfo(IgniteUuid xid, long startTime, long duration, TransactionIsolation isolation, TransactionConcurrency concurrency, long timeout, String lb, Collection<UUID> primaryNodes, - TransactionState state, int size) { + TransactionState state, int size, IgniteUuid nearXid, Collection<UUID> masterNodeIds) { this.xid = xid; this.startTime = startTime; this.duration = duration; @@ -107,6 +114,13 @@ public class VisorTxInfo extends VisorDataTransferObject { this.primaryNodes = primaryNodes; this.state = state; this.size = size; + this.nearXid = nearXid; + this.masterNodeIds = masterNodeIds; + } + + /** {@inheritDoc} */ + @Override public byte getProtocolVersion() { + return V2; } /** */ @@ -164,9 +178,14 @@ public class VisorTxInfo extends VisorDataTransferObject { return size; } - /** {@inheritDoc} */ - @Override public byte getProtocolVersion() { - return V2; + /** */ + public @Nullable IgniteUuid getNearXid() { + return nearXid; + } + + /** */ + public @Nullable Collection<UUID> getMasterNodeIds() { + return masterNodeIds; } /** {@inheritDoc} */ @@ -180,12 +199,13 @@ public class VisorTxInfo extends VisorDataTransferObject { U.writeCollection(out, primaryNodes); U.writeEnum(out, state); out.writeInt(size); + U.writeGridUuid(out, nearXid); + U.writeCollection(out, masterNodeIds); out.writeLong(startTime); } /** {@inheritDoc} */ - @Override protected void readExternalData(byte protoVer, - ObjectInput in) throws IOException, ClassNotFoundException { + @Override protected void readExternalData(byte protoVer, ObjectInput in) throws IOException, ClassNotFoundException { xid = U.readGridUuid(in); duration = in.readLong(); isolation = TransactionIsolation.fromOrdinal(in.readByte()); @@ -195,7 +215,14 @@ public class VisorTxInfo extends VisorDataTransferObject { primaryNodes = U.readCollection(in); state = TransactionState.fromOrdinal(in.readByte()); size = in.readInt(); - startTime = protoVer >= V2 ? in.readLong() : 0L; + if (protoVer >= V2) { + nearXid = U.readGridUuid(in); + + masterNodeIds = U.readCollection(in); + + startTime = in.readLong(); + } + } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/ignite/blob/d305c663/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java index 25a69d1..9919b7d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java @@ -21,7 +21,8 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.Comparator; -import java.util.ConcurrentModificationException; +import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.TreeMap; @@ -31,22 +32,32 @@ import java.util.regex.PatternSyntaxException; import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.compute.ComputeJobResult; +import org.apache.ignite.internal.IgniteInternalFuture; import org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote; import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; import org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxMappings; -import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteEx; +import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; import org.apache.ignite.internal.processors.task.GridInternal; +import org.apache.ignite.internal.util.future.GridFinishedFuture; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.VisorJob; import org.apache.ignite.internal.visor.VisorMultiNodeTask; import org.apache.ignite.internal.visor.VisorTaskArgument; +import org.apache.ignite.lang.IgniteBiClosure; import org.apache.ignite.lang.IgniteClosure; import org.apache.ignite.lang.IgnitePredicate; -import org.apache.ignite.lang.IgniteUuid; -import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionState; import org.jetbrains.annotations.Nullable; +import static org.apache.ignite.transactions.TransactionState.COMMITTED; +import static org.apache.ignite.transactions.TransactionState.COMMITTING; + /** * */ @@ -103,6 +114,8 @@ public class VisorTxTask extends VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN @Nullable @Override protected Map<ClusterNode, VisorTxTaskResult> reduce0(List<ComputeJobResult> results) throws IgniteException { Map<ClusterNode, VisorTxTaskResult> mapRes = new TreeMap<>(); + Map<UUID, ClusterNode> nodeMap = new HashMap<>(); + for (ComputeJobResult result : results) { VisorTxTaskResult data = result.getData(); @@ -110,6 +123,47 @@ public class VisorTxTask extends VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN continue; mapRes.put(result.getNode(), data); + + nodeMap.put(result.getNode().id(), result.getNode()); + } + + // Remove local and remote txs for which near txs are present. + for (VisorTxTaskResult result : mapRes.values()) { + List<VisorTxInfo> infos = result.getInfos(); + + Iterator<VisorTxInfo> it = infos.iterator(); + + while (it.hasNext()) { + VisorTxInfo info = it.next(); + + if (!info.getXid().equals(info.getNearXid())) { + UUID nearNodeId = info.getMasterNodeIds().iterator().next(); + + // Try find id. + ClusterNode node = nodeMap.get(nearNodeId); + + if (node == null) + continue; + + VisorTxTaskResult res0 = mapRes.get(node); + + if (res0 == null) + continue; + + boolean exists = false; + + for (VisorTxInfo txInfo : res0.getInfos()) { + if (txInfo.getXid().equals(info.getNearXid())) { + exists = true; + + break; + } + } + + if (exists) + it.remove(); + } + } } return mapRes; @@ -123,7 +177,16 @@ public class VisorTxTask extends VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN private static final long serialVersionUID = 0L; /** */ - public static final int DEFAULT_LIMIT = 50; + private static final int DEFAULT_LIMIT = 50; + + /** */ + private static final TxKillClosure NEAR_KILL_CLOSURE = new NearKillClosure(); + + /** */ + private static final TxKillClosure LOCAL_KILL_CLOSURE = NEAR_KILL_CLOSURE; + + /** */ + private static final TxKillClosure REMOTE_KILL_CLOSURE = new RemoteKillClosure(); /** * @param arg Formal job argument. @@ -138,7 +201,9 @@ public class VisorTxTask extends VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN if (arg == null) return new VisorTxTaskResult(Collections.emptyList()); - Collection<Transaction> transactions = ignite.transactions().localActiveTransactions(); + IgniteTxManager tm = ignite.context().cache().context().tm(); + + Collection<IgniteInternalTx> transactions = tm.activeTransactions(); List<VisorTxInfo> infos = new ArrayList<>(); @@ -155,50 +220,102 @@ public class VisorTxTask extends VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN } } - for (Transaction transaction : transactions) { - GridNearTxLocal locTx = ((TransactionProxyImpl)transaction).tx(); - + for (IgniteInternalTx locTx : transactions) { if (arg.getXid() != null && !locTx.xid().toString().equals(arg.getXid())) continue; if (arg.getState() != null && locTx.state() != arg.getState()) continue; - long duration = U.currentTimeMillis() - transaction.startTime(); + long duration = U.currentTimeMillis() - locTx.startTime(); - if (arg.getMinDuration() != null && - duration < arg.getMinDuration()) + if (arg.getMinDuration() != null && duration < arg.getMinDuration()) continue; - if (lbMatch != null && !lbMatch.matcher(locTx.label() == null ? "null" : locTx.label()).matches()) - continue; + String lb = null; + int size = 0; + Collection<UUID> mappings = null; + TxKillClosure killClo = null; - Collection<UUID> mappings = new ArrayList<>(); + // This filter conditions have meaning only for near txs, so we skip dht because it will never match. + boolean skip = arg.getMinSize() != null || lbMatch != null; - int size = 0; + if (locTx instanceof GridNearTxLocal) { + GridNearTxLocal locTx0 = (GridNearTxLocal)locTx; + + lb = locTx0.label(); + + if (lbMatch != null && !lbMatch.matcher(lb == null ? "null" : lb).matches()) + continue; + + mappings = new ArrayList<>(); + + if (locTx0.mappings() != null) { + IgniteTxMappings txMappings = locTx0.mappings(); + + for (GridDistributedTxMapping mapping : + txMappings.single() ? Collections.singleton(txMappings.singleMapping()) : txMappings.mappings()) { + if (mapping == null) + continue; - if (locTx.mappings() != null) { - IgniteTxMappings txMappings = locTx.mappings(); + mappings.add(mapping.primary().id()); - for (GridDistributedTxMapping mapping : - txMappings.single() ? Collections.singleton(txMappings.singleMapping()) : txMappings.mappings()) { - if (mapping == null) - continue; + size += mapping.entries().size(); // Entries are not synchronized so no visibility guaranties for size. + } + } + + if (arg.getMinSize() != null && size < arg.getMinSize()) + continue; + + killClo = NEAR_KILL_CLOSURE; + } + else if (locTx instanceof GridDhtTxLocal) { + if (skip) + continue; + + GridDhtTxLocal locTx0 = (GridDhtTxLocal)locTx; + + Map<UUID, GridDistributedTxMapping> dhtMap = U.field(locTx0, "dhtMap"); + + mappings = new ArrayList<>(); + + if (dhtMap != null) { + for (GridDistributedTxMapping mapping : dhtMap.values()) { + mappings.add(mapping.primary().id()); + + size += mapping.entries().size(); + } + } + + Map<UUID, GridDistributedTxMapping> nearMap = U.field(locTx, "nearMap"); - mappings.add(mapping.primary().id()); + if (nearMap != null) { + for (GridDistributedTxMapping mapping : nearMap.values()) { + mappings.add(mapping.primary().id()); - size += mapping.entries().size(); // Entries are not synchronized so no visibility guaranties for size. + size += mapping.entries().size(); + } } + + killClo = LOCAL_KILL_CLOSURE; } + else if (locTx instanceof GridDhtTxRemote) { + if (skip) + continue; - if (arg.getMinSize() != null && size < arg.getMinSize()) - continue; + GridDhtTxRemote locTx0 = (GridDhtTxRemote)locTx; + + size = locTx0.readMap().size() + locTx.writeMap().size(); + + killClo = REMOTE_KILL_CLOSURE; + } infos.add(new VisorTxInfo(locTx.xid(), locTx.startTime(), duration, locTx.isolation(), locTx.concurrency(), - locTx.timeout(), locTx.label(), mappings, locTx.state(), size)); + locTx.timeout(), lb, mappings, locTx.state(), + size, locTx.nearXidVersion().asGridUuid(), locTx.masterNodeIds())); if (arg.getOperation() == VisorTxOperation.KILL) - locTx.rollbackAsync(); + killClo.apply(locTx, tm); if (infos.size() == limit) break; @@ -271,4 +388,43 @@ public class VisorTxTask extends VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN return Long.compare(o2.getSize(), o1.getSize()); } } + + /** Type shortcut. */ + private interface TxKillClosure extends + IgniteBiClosure<IgniteInternalTx, IgniteTxManager, IgniteInternalFuture<IgniteInternalTx>> { + } + + /** Kills near or local tx. */ + private static class NearKillClosure implements TxKillClosure { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<IgniteInternalTx> apply(IgniteInternalTx tx, IgniteTxManager tm) { + return tx.isRollbackOnly() || tx.state() == COMMITTING || tx.state() == COMMITTED ? + new GridFinishedFuture<>() : tx.rollbackAsync(); + } + } + + /** Kills remote tx. */ + private static class RemoteKillClosure implements TxKillClosure { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public IgniteInternalFuture<IgniteInternalTx> apply(IgniteInternalTx tx, IgniteTxManager tm) { + IgniteTxRemoteEx remote = (IgniteTxRemoteEx)tx; + + if (tx.isRollbackOnly() || tx.state() == COMMITTING || tx.state() == COMMITTED) + return new GridFinishedFuture<>(); + + if (tx.state() == TransactionState.PREPARED) + remote.doneRemote(tx.xidVersion(), + Collections.<GridCacheVersion>emptyList(), + Collections.<GridCacheVersion>emptyList(), + Collections.<GridCacheVersion>emptyList()); + + return tx.rollbackAsync(); + } + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d305c663/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java index 5d12d9a..b36bf16 100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -29,13 +30,16 @@ import org.apache.ignite.IgniteException; import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; import org.apache.ignite.internal.managers.communication.GridIoMessage; +import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; import org.apache.ignite.plugin.extensions.communication.Message; import org.apache.ignite.spi.IgniteSpiException; import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi; +import org.jetbrains.annotations.Nullable; import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME; @@ -199,6 +203,18 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { } /** + * @param cnt Number of messages to wait. + * + * @throws InterruptedException If interrupted. + */ + public void waitForBlocked(int cnt) throws InterruptedException { + synchronized (this) { + while (blockedMsgs.size() < cnt) + wait(); + } + } + + /** * @throws InterruptedException If interrupted. */ public void waitForRecorded() throws InterruptedException { @@ -251,26 +267,38 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { } /** - * Stops block messages and can sends all already blocked messages. + * Stops block messages and sends all already blocked messages. */ public void stopBlock() { - stopBlock(true); + stopBlock(true, null); } /** * Stops block messages and sends all already blocked messages if sndMsgs is 'true'. * - * @param sndMsgs If {@code true} sends blocked messages. + * @param sndMsgs {@code True} to send blocked messages. */ public void stopBlock(boolean sndMsgs) { - synchronized (this) { - blockP = null; + stopBlock(sndMsgs, null); + } + /** + * Stops block messages and sends all already blocked messages if sndMsgs is 'true' optionally filtered + * by unblockPred. + * + * @param sndMsgs If {@code true} sends blocked messages. + * @param unblockPred If not null unblocks only messages allowed by predicate. + */ + public void stopBlock(boolean sndMsgs, @Nullable IgnitePredicate<T2<ClusterNode, GridIoMessage>> unblockPred) { + synchronized (this) { blockCls.clear(); blockP = null; + Collection<T2<ClusterNode, GridIoMessage>> msgs = + unblockPred == null ? blockedMsgs : F.view(blockedMsgs, unblockPred); + if (sndMsgs) { - for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) { + for (T2<ClusterNode, GridIoMessage> msg : msgs) { try { ignite.log().info("Send blocked message [node=" + msg.get1().id() + ", order=" + msg.get1().order() + @@ -284,7 +312,7 @@ public class TestRecordingCommunicationSpi extends TcpCommunicationSpi { } } - blockedMsgs.clear(); + msgs.clear(); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/d305c663/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java index 62f1518..5dd9b2b 100644 --- a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java +++ b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.TreeMap; import java.util.concurrent.CountDownLatch; @@ -31,7 +32,10 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.List; +import java.util.concurrent.atomic.LongAdder; +import javax.cache.processor.EntryProcessor; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; import org.apache.ignite.Ignite; import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteCache; @@ -46,26 +50,45 @@ import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.WALMode; import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInternalFuture; +import org.apache.ignite.internal.TestRecordingCommunicationSpi; import org.apache.ignite.internal.commandline.CommandHandler; import org.apache.ignite.internal.commandline.cache.CacheCommand; +import org.apache.ignite.internal.managers.communication.GridIoMessage; import org.apache.ignite.internal.pagemem.wal.record.DataEntry; import org.apache.ignite.internal.processors.cache.CacheObjectImpl; import org.apache.ignite.internal.processors.cache.GridCacheContext; +import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; import org.apache.ignite.internal.processors.cache.GridCacheFuture; +import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; +import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; +import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest; +import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; +import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; +import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry; +import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; import org.apache.ignite.internal.processors.cache.GridCacheOperation; import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl; import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; +import org.apache.ignite.internal.util.lang.GridAbsPredicate; import org.apache.ignite.internal.util.typedef.F; import org.apache.ignite.internal.util.typedef.G; +import org.apache.ignite.internal.util.typedef.T2; import org.apache.ignite.internal.util.typedef.X; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.internal.visor.tx.VisorTxInfo; import org.apache.ignite.internal.visor.tx.VisorTxTaskResult; +import org.apache.ignite.lang.IgniteBiPredicate; import org.apache.ignite.lang.IgniteInClosure; +import org.apache.ignite.lang.IgnitePredicate; +import org.apache.ignite.plugin.extensions.communication.Message; +import org.apache.ignite.testframework.GridTestUtils; import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest; import org.apache.ignite.transactions.Transaction; import org.apache.ignite.transactions.TransactionRollbackException; +import org.apache.ignite.transactions.TransactionTimeoutException; import static org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTAL_COMMAND; import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL; @@ -137,10 +160,12 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest { @Override protected IgniteConfiguration getConfiguration(String igniteInstanceName) throws Exception { IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName); + cfg.setCommunicationSpi(new TestRecordingCommunicationSpi()); + cfg.setConnectorConfiguration(new ConnectorConfiguration()); DataStorageConfiguration memCfg = new DataStorageConfiguration().setDefaultDataRegionConfiguration( - new DataRegionConfiguration().setMaxSize(100L * 1024 * 1024)); + new DataRegionConfiguration().setMaxSize(50L * 1024 * 1024)); cfg.setDataStorageConfiguration(memCfg); @@ -150,7 +175,7 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest { cfg.setConsistentId(igniteInstanceName); - cfg.setClientMode("client".equals(igniteInstanceName)); + cfg.setClientMode(igniteInstanceName.startsWith("client")); return cfg; } @@ -469,26 +494,22 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest { assertNotNull(res); - for (VisorTxInfo txInfo : res.getInfos()) { + for (VisorTxInfo txInfo : res.getInfos()) assertTrue(txInfo.getSize() >= minSize); - - } }, "--tx", "minSize", Integer.toString(minSize)); // test order by size. validate(h, map -> { VisorTxTaskResult res = map.get(grid(0).localNode()); - assertTrue(res.getInfos().get(0).getSize() >= res.getInfos().get(1).getSize()); - + assertTrue(res.getInfos().get(0).getSize() >= res.getInfos().get(1).getSize()); }, "--tx", "order", "SIZE"); // test order by duration. validate(h, map -> { VisorTxTaskResult res = map.get(grid(0).localNode()); - assertTrue(res.getInfos().get(0).getDuration() >= res.getInfos().get(1).getDuration()); - + assertTrue(res.getInfos().get(0).getDuration() >= res.getInfos().get(1).getDuration()); }, "--tx", "order", "DURATION"); // test order by start_time. @@ -538,6 +559,251 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest { } /** + * + */ + public void testKillHangingLocalTransactions() throws Exception { + Ignite ignite = startGridsMultiThreaded(2); + + ignite.cluster().active(true); + + Ignite client = startGrid("client"); + + client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME). + setAtomicityMode(TRANSACTIONAL). + setWriteSynchronizationMode(FULL_SYNC). + setAffinity(new RendezvousAffinityFunction(false, 64))); + + Ignite prim = primaryNode(0L, DEFAULT_CACHE_NAME); + + // Blocks lock response to near node. + TestRecordingCommunicationSpi.spi(prim).blockMessages(GridNearLockResponse.class, client.name()); + + TestRecordingCommunicationSpi.spi(client).blockMessages(GridNearTxFinishRequest.class, prim.name()); + + GridNearTxLocal clientTx = null; + + try(Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 2000, 1)) { + clientTx = ((TransactionProxyImpl)tx).tx(); + + client.cache(DEFAULT_CACHE_NAME).put(0L, 0L); + + fail(); + } + catch (Exception e) { + assertTrue(X.hasCause(e, TransactionTimeoutException.class)); + } + + assertNotNull(clientTx); + + IgniteEx primEx = (IgniteEx)prim; + + IgniteInternalTx tx0 = primEx.context().cache().context().tm().activeTransactions().iterator().next(); + + assertNotNull(tx0); + + CommandHandler h = new CommandHandler(); + + validate(h, map -> { + ClusterNode node = grid(0).cluster().localNode(); + + VisorTxTaskResult res = map.get(node); + + for (VisorTxInfo info : res.getInfos()) + assertEquals(tx0.xid(), info.getXid()); + + assertEquals(1, map.size()); + }, "--tx", "kill"); + + tx0.finishFuture().get(); + + TestRecordingCommunicationSpi.spi(prim).stopBlock(); + + TestRecordingCommunicationSpi.spi(client).stopBlock(); + + IgniteInternalFuture<?> nearFinFut = U.field(clientTx, "finishFut"); + + nearFinFut.get(); + + checkFutures(); + } + + /** + * Simulate uncommitted backup transactions and test rolling back using utility. + */ + public void testKillHangingRemoteTransactions() throws Exception { + final int cnt = 3; + + startGridsMultiThreaded(cnt); + + Ignite[] clients = new Ignite[] { + startGrid("client1"), + startGrid("client2"), + startGrid("client3"), + startGrid("client4") + }; + + clients[0].getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME). + setBackups(2). + setAtomicityMode(TRANSACTIONAL). + setWriteSynchronizationMode(FULL_SYNC). + setAffinity(new RendezvousAffinityFunction(false, 64))); + + for (Ignite client : clients) { + assertTrue(client.configuration().isClientMode()); + + assertNotNull(client.cache(DEFAULT_CACHE_NAME)); + } + + LongAdder progress = new LongAdder(); + + AtomicInteger idx = new AtomicInteger(); + + int tc = clients.length; + + CountDownLatch lockLatch = new CountDownLatch(1); + CountDownLatch commitLatch = new CountDownLatch(1); + + Ignite prim = primaryNode(0L, DEFAULT_CACHE_NAME); + + TestRecordingCommunicationSpi primSpi = TestRecordingCommunicationSpi.spi(prim); + + primSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() { + @Override public boolean apply(ClusterNode node, Message message) { + return message instanceof GridDhtTxFinishRequest; + } + }); + + IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() { + @Override public void run() { + int id = idx.getAndIncrement(); + + Ignite client = clients[id]; + + try (Transaction tx = client.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 0, 1)) { + IgniteCache<Long, Long> cache = client.cache(DEFAULT_CACHE_NAME); + + if (id != 0) + U.awaitQuiet(lockLatch); + + cache.invoke(0L, new IncrementClosure(), null); + + if (id == 0) { + lockLatch.countDown(); + + U.awaitQuiet(commitLatch); + + doSleep(500); // Wait until candidates will enqueue. + } + + tx.commit(); + } + catch (Exception e) { + assertTrue(X.hasCause(e, TransactionTimeoutException.class)); + } + + progress.increment(); + + } + }, tc, "invoke-thread"); + + U.awaitQuiet(lockLatch); + + commitLatch.countDown(); + + primSpi.waitForBlocked(clients.length); + + // Unblock only finish messages from clients from 2 to 4. + primSpi.stopBlock(true, new IgnitePredicate<T2<ClusterNode,GridIoMessage>>() { + @Override public boolean apply(T2<ClusterNode, GridIoMessage> objects) { + GridIoMessage iom = objects.get2(); + + Message m = iom.message(); + + if (m instanceof GridDhtTxFinishRequest) { + GridDhtTxFinishRequest r = (GridDhtTxFinishRequest)m; + + if (r.nearNodeId().equals(clients[0].cluster().localNode().id())) + return false; + } + + return true; + } + }); + + // Wait until queue is stable + for (Ignite ignite : G.allGrids()) { + if (ignite.configuration().isClientMode()) + continue; + + Collection<IgniteInternalTx> txs = ((IgniteEx)ignite).context().cache().context().tm().activeTransactions(); + + GridTestUtils.waitForCondition(new GridAbsPredicate() { + @Override public boolean apply() { + for (IgniteInternalTx tx : txs) + if (!tx.local()) { + IgniteTxEntry entry = tx.writeEntries().iterator().next(); + + GridCacheEntryEx cached = entry.cached(); + + Collection<GridCacheMvccCandidate> candidates = cached.remoteMvccSnapshot(); + + if (candidates.size() != clients.length) + return false; + } + + return true; + } + }, 10_000); + } + + CommandHandler h = new CommandHandler(); + + // Check listing. + validate(h, map -> { + for (int i = 0; i < cnt; i++) { + IgniteEx grid = grid(i); + + // Skip primary. + if (grid.localNode().id().equals(prim.cluster().localNode().id())) + continue; + + VisorTxTaskResult res = map.get(grid.localNode()); + + // Validate queue length on backups. + assertEquals(clients.length, res.getInfos().size()); + } + }, "--tx"); + + // Check kill. + validate(h, map -> { + // No-op. + }, "--tx", "kill"); + + // Wait for all remote txs to finish. + for (Ignite ignite : G.allGrids()) { + if (ignite.configuration().isClientMode()) + continue; + + Collection<IgniteInternalTx> txs = ((IgniteEx)ignite).context().cache().context().tm().activeTransactions(); + + for (IgniteInternalTx tx : txs) + if (!tx.local()) + tx.finishFuture().get(); + } + + // Unblock finish message from client1. + primSpi.stopBlock(true); + + fut.get(); + + Long cur = (Long)clients[0].cache(DEFAULT_CACHE_NAME).get(0L); + + assertEquals(tc - 1, cur.longValue()); + + checkFutures(); + } + + /** * Test baseline add items works via control.sh * * @throws Exception If failed. @@ -1006,6 +1272,23 @@ public class GridCommandHandlerTest extends GridCommonAbstractTest { log.info("Waiting for future: " + fut); assertTrue("Expecting no active futures: node=" + ig.localNode().id(), futs.isEmpty()); + + Collection<IgniteInternalTx> txs = ig.context().cache().context().tm().activeTransactions(); + + for (IgniteInternalTx tx : txs) + log.info("Waiting for tx: " + tx); + + assertTrue("Expecting no active transactions: node=" + ig.localNode().id(), txs.isEmpty()); + } + } + + /** */ + private static class IncrementClosure implements EntryProcessor<Long, Long, Void> { + /** {@inheritDoc} */ + @Override public Void process(MutableEntry<Long, Long> entry, Object... arguments) throws EntryProcessorException { + entry.setValue(entry.exists() ? entry.getValue() + 1 : 0); + + return null; } }