Repository: ignite
Updated Branches:
  refs/heads/ignite-8446 7f89fa52f -> cc8591a1a


IGNITE-8808 Improve control.sh --tx command to show local and remote 
transactions. - Fixes #4209.

Signed-off-by: Alexey Goncharuk <alexey.goncha...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/d305c663
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/d305c663
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/d305c663

Branch: refs/heads/ignite-8446
Commit: d305c6634032c152fd2432a31f465e0083c715a2
Parents: cf09e76
Author: Aleksei Scherbakov <alexey.scherbak...@gmail.com>
Authored: Tue Jun 19 20:12:13 2018 +0300
Committer: Alexey Goncharuk <alexey.goncha...@gmail.com>
Committed: Tue Jun 19 20:12:13 2018 +0300

----------------------------------------------------------------------
 .../internal/commandline/CommandHandler.java    |  18 +-
 .../ignite/internal/visor/tx/VisorTxInfo.java   |  41 ++-
 .../ignite/internal/visor/tx/VisorTxTask.java   | 212 +++++++++++--
 .../internal/TestRecordingCommunicationSpi.java |  42 ++-
 .../ignite/util/GridCommandHandlerTest.java     | 303 ++++++++++++++++++-
 5 files changed, 559 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/d305c663/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
index ed85f0c..cf05699 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/commandline/CommandHandler.java
@@ -1088,11 +1088,19 @@ public class CommandHandler {
                         ", concurrency=" + info.getConcurrency() +
                         ", timeout=" + info.getTimeout() +
                         ", size=" + info.getSize() +
-                        ", dhtNodes=" + F.transform(info.getPrimaryNodes(), 
new IgniteClosure<UUID, String>() {
-                        @Override public String apply(UUID id) {
-                            return U.id8(id);
-                        }
-                    }) +
+                        ", dhtNodes=" + (info.getPrimaryNodes() == null ? 
"N/A" :
+                        F.transform(info.getPrimaryNodes(), new 
IgniteClosure<UUID, String>() {
+                            @Override public String apply(UUID id) {
+                                return U.id8(id);
+                            }
+                        })) +
+                        ", nearXid=" + info.getNearXid() +
+                        ", parentNodeIds=" + (info.getMasterNodeIds() == null 
? "N/A" :
+                        F.transform(info.getMasterNodeIds(), new 
IgniteClosure<UUID, String>() {
+                            @Override public String apply(UUID id) {
+                                return U.id8(id);
+                            }
+                        })) +
                         ']');
             }
         }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d305c663/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
index ecf3e0d..03de5b0 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxInfo.java
@@ -33,6 +33,7 @@ import org.apache.ignite.lang.IgniteUuid;
 import org.apache.ignite.transactions.TransactionConcurrency;
 import org.apache.ignite.transactions.TransactionIsolation;
 import org.apache.ignite.transactions.TransactionState;
+import org.jetbrains.annotations.Nullable;
 
 /**
  */
@@ -75,6 +76,12 @@ public class VisorTxInfo extends VisorDataTransferObject {
     /** */
     private int size;
 
+    /** */
+    private IgniteUuid nearXid;
+
+    /** */
+    private Collection<UUID> masterNodeIds;
+
     /**
      * Default constructor.
      */
@@ -96,7 +103,7 @@ public class VisorTxInfo extends VisorDataTransferObject {
      */
     public VisorTxInfo(IgniteUuid xid, long startTime, long duration, 
TransactionIsolation isolation,
         TransactionConcurrency concurrency, long timeout, String lb, 
Collection<UUID> primaryNodes,
-        TransactionState state, int size) {
+        TransactionState state, int size, IgniteUuid nearXid, Collection<UUID> 
masterNodeIds) {
         this.xid = xid;
         this.startTime = startTime;
         this.duration = duration;
@@ -107,6 +114,13 @@ public class VisorTxInfo extends VisorDataTransferObject {
         this.primaryNodes = primaryNodes;
         this.state = state;
         this.size = size;
+        this.nearXid = nearXid;
+        this.masterNodeIds = masterNodeIds;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte getProtocolVersion() {
+        return V2;
     }
 
     /** */
@@ -164,9 +178,14 @@ public class VisorTxInfo extends VisorDataTransferObject {
         return size;
     }
 
-    /** {@inheritDoc} */
-    @Override public byte getProtocolVersion() {
-        return V2;
+    /** */
+    public @Nullable IgniteUuid getNearXid() {
+        return nearXid;
+    }
+
+    /** */
+    public @Nullable Collection<UUID> getMasterNodeIds() {
+        return masterNodeIds;
     }
 
     /** {@inheritDoc} */
@@ -180,12 +199,13 @@ public class VisorTxInfo extends VisorDataTransferObject {
         U.writeCollection(out, primaryNodes);
         U.writeEnum(out, state);
         out.writeInt(size);
+        U.writeGridUuid(out, nearXid);
+        U.writeCollection(out, masterNodeIds);
         out.writeLong(startTime);
     }
 
     /** {@inheritDoc} */
-    @Override protected void readExternalData(byte protoVer,
-        ObjectInput in) throws IOException, ClassNotFoundException {
+    @Override protected void readExternalData(byte protoVer, ObjectInput in) 
throws IOException, ClassNotFoundException {
         xid = U.readGridUuid(in);
         duration = in.readLong();
         isolation = TransactionIsolation.fromOrdinal(in.readByte());
@@ -195,7 +215,14 @@ public class VisorTxInfo extends VisorDataTransferObject {
         primaryNodes = U.readCollection(in);
         state = TransactionState.fromOrdinal(in.readByte());
         size = in.readInt();
-        startTime = protoVer >= V2 ? in.readLong() : 0L;
+        if (protoVer >= V2) {
+            nearXid = U.readGridUuid(in);
+
+            masterNodeIds = U.readCollection(in);
+
+            startTime = in.readLong();
+        }
+
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/ignite/blob/d305c663/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
index 25a69d1..9919b7d 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/visor/tx/VisorTxTask.java
@@ -21,7 +21,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.ConcurrentModificationException;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
@@ -31,22 +32,32 @@ import java.util.regex.PatternSyntaxException;
 import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.compute.ComputeJobResult;
+import org.apache.ignite.internal.IgniteInternalFuture;
 import 
org.apache.ignite.internal.processors.cache.distributed.GridDistributedTxMapping;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxLocal;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxRemote;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
 import 
org.apache.ignite.internal.processors.cache.distributed.near.IgniteTxMappings;
-import 
org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteTxRemoteEx;
+import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
 import org.apache.ignite.internal.processors.task.GridInternal;
+import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.VisorJob;
 import org.apache.ignite.internal.visor.VisorMultiNodeTask;
 import org.apache.ignite.internal.visor.VisorTaskArgument;
+import org.apache.ignite.lang.IgniteBiClosure;
 import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.lang.IgnitePredicate;
-import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionState;
 import org.jetbrains.annotations.Nullable;
 
+import static org.apache.ignite.transactions.TransactionState.COMMITTED;
+import static org.apache.ignite.transactions.TransactionState.COMMITTING;
+
 /**
  *
  */
@@ -103,6 +114,8 @@ public class VisorTxTask extends 
VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN
     @Nullable @Override protected Map<ClusterNode, VisorTxTaskResult> 
reduce0(List<ComputeJobResult> results) throws IgniteException {
         Map<ClusterNode, VisorTxTaskResult> mapRes = new TreeMap<>();
 
+        Map<UUID, ClusterNode> nodeMap = new HashMap<>();
+
         for (ComputeJobResult result : results) {
             VisorTxTaskResult data = result.getData();
 
@@ -110,6 +123,47 @@ public class VisorTxTask extends 
VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN
                 continue;
 
             mapRes.put(result.getNode(), data);
+
+            nodeMap.put(result.getNode().id(), result.getNode());
+        }
+
+        // Remove local and remote txs for which near txs are present.
+        for (VisorTxTaskResult result : mapRes.values()) {
+            List<VisorTxInfo> infos = result.getInfos();
+
+            Iterator<VisorTxInfo> it = infos.iterator();
+
+            while (it.hasNext()) {
+                VisorTxInfo info = it.next();
+
+                if (!info.getXid().equals(info.getNearXid())) {
+                    UUID nearNodeId = 
info.getMasterNodeIds().iterator().next();
+
+                    // Try find id.
+                    ClusterNode node = nodeMap.get(nearNodeId);
+
+                    if (node == null)
+                        continue;
+
+                    VisorTxTaskResult res0 = mapRes.get(node);
+
+                    if (res0 == null)
+                        continue;
+
+                    boolean exists = false;
+
+                    for (VisorTxInfo txInfo : res0.getInfos()) {
+                        if (txInfo.getXid().equals(info.getNearXid())) {
+                            exists = true;
+
+                            break;
+                        }
+                    }
+
+                    if (exists)
+                        it.remove();
+                }
+            }
         }
 
         return mapRes;
@@ -123,7 +177,16 @@ public class VisorTxTask extends 
VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN
         private static final long serialVersionUID = 0L;
 
         /** */
-        public static final int DEFAULT_LIMIT = 50;
+        private static final int DEFAULT_LIMIT = 50;
+
+        /** */
+        private static final TxKillClosure NEAR_KILL_CLOSURE = new 
NearKillClosure();
+
+        /** */
+        private static final TxKillClosure LOCAL_KILL_CLOSURE = 
NEAR_KILL_CLOSURE;
+
+        /** */
+        private static final TxKillClosure REMOTE_KILL_CLOSURE = new 
RemoteKillClosure();
 
         /**
          * @param arg Formal job argument.
@@ -138,7 +201,9 @@ public class VisorTxTask extends 
VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN
             if (arg == null)
                 return new VisorTxTaskResult(Collections.emptyList());
 
-            Collection<Transaction> transactions = 
ignite.transactions().localActiveTransactions();
+            IgniteTxManager tm = ignite.context().cache().context().tm();
+
+            Collection<IgniteInternalTx> transactions = 
tm.activeTransactions();
 
             List<VisorTxInfo> infos = new ArrayList<>();
 
@@ -155,50 +220,102 @@ public class VisorTxTask extends 
VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN
                 }
             }
 
-            for (Transaction transaction : transactions) {
-                GridNearTxLocal locTx = 
((TransactionProxyImpl)transaction).tx();
-
+            for (IgniteInternalTx locTx : transactions) {
                 if (arg.getXid() != null && 
!locTx.xid().toString().equals(arg.getXid()))
                     continue;
 
                 if (arg.getState() != null && locTx.state() != arg.getState())
                     continue;
 
-                long duration = U.currentTimeMillis() - 
transaction.startTime();
+                long duration = U.currentTimeMillis() - locTx.startTime();
 
-                if (arg.getMinDuration() != null &&
-                    duration < arg.getMinDuration())
+                if (arg.getMinDuration() != null && duration < 
arg.getMinDuration())
                     continue;
 
-                if (lbMatch != null && !lbMatch.matcher(locTx.label() == null 
? "null" : locTx.label()).matches())
-                    continue;
+                String lb = null;
+                int size = 0;
+                Collection<UUID> mappings = null;
+                TxKillClosure killClo = null;
 
-                Collection<UUID> mappings = new ArrayList<>();
+                // This filter conditions have meaning only for near txs, so 
we skip dht because it will never match.
+                boolean skip = arg.getMinSize() != null || lbMatch != null;
 
-                int size = 0;
+                if (locTx instanceof GridNearTxLocal) {
+                    GridNearTxLocal locTx0 = (GridNearTxLocal)locTx;
+
+                    lb = locTx0.label();
+
+                    if (lbMatch != null && !lbMatch.matcher(lb == null ? 
"null" : lb).matches())
+                        continue;
+
+                    mappings = new ArrayList<>();
+
+                    if (locTx0.mappings() != null) {
+                        IgniteTxMappings txMappings = locTx0.mappings();
+
+                        for (GridDistributedTxMapping mapping :
+                            txMappings.single() ? 
Collections.singleton(txMappings.singleMapping()) : txMappings.mappings()) {
+                            if (mapping == null)
+                                continue;
 
-                if (locTx.mappings() != null) {
-                    IgniteTxMappings txMappings = locTx.mappings();
+                            mappings.add(mapping.primary().id());
 
-                    for (GridDistributedTxMapping mapping :
-                        txMappings.single() ? 
Collections.singleton(txMappings.singleMapping()) : txMappings.mappings()) {
-                        if (mapping == null)
-                            continue;
+                            size += mapping.entries().size(); // Entries are 
not synchronized so no visibility guaranties for size.
+                        }
+                    }
+
+                    if (arg.getMinSize() != null && size < arg.getMinSize())
+                        continue;
+
+                    killClo = NEAR_KILL_CLOSURE;
+                }
+                else if (locTx instanceof GridDhtTxLocal) {
+                    if (skip)
+                        continue;
+
+                    GridDhtTxLocal locTx0 = (GridDhtTxLocal)locTx;
+
+                    Map<UUID, GridDistributedTxMapping> dhtMap = 
U.field(locTx0, "dhtMap");
+
+                    mappings = new ArrayList<>();
+
+                    if (dhtMap != null) {
+                        for (GridDistributedTxMapping mapping : 
dhtMap.values()) {
+                            mappings.add(mapping.primary().id());
+
+                            size += mapping.entries().size();
+                        }
+                    }
+
+                    Map<UUID, GridDistributedTxMapping> nearMap = 
U.field(locTx, "nearMap");
 
-                        mappings.add(mapping.primary().id());
+                    if (nearMap != null) {
+                        for (GridDistributedTxMapping mapping : 
nearMap.values()) {
+                            mappings.add(mapping.primary().id());
 
-                        size += mapping.entries().size(); // Entries are not 
synchronized so no visibility guaranties for size.
+                            size += mapping.entries().size();
+                        }
                     }
+
+                    killClo = LOCAL_KILL_CLOSURE;
                 }
+                else if (locTx instanceof GridDhtTxRemote) {
+                    if (skip)
+                        continue;
 
-                if (arg.getMinSize() != null && size < arg.getMinSize())
-                    continue;
+                    GridDhtTxRemote locTx0 = (GridDhtTxRemote)locTx;
+
+                    size = locTx0.readMap().size() + locTx.writeMap().size();
+
+                    killClo = REMOTE_KILL_CLOSURE;
+                }
 
                 infos.add(new VisorTxInfo(locTx.xid(), locTx.startTime(), 
duration, locTx.isolation(), locTx.concurrency(),
-                    locTx.timeout(), locTx.label(), mappings, locTx.state(), 
size));
+                    locTx.timeout(), lb, mappings, locTx.state(),
+                    size, locTx.nearXidVersion().asGridUuid(), 
locTx.masterNodeIds()));
 
                 if (arg.getOperation() == VisorTxOperation.KILL)
-                    locTx.rollbackAsync();
+                    killClo.apply(locTx, tm);
 
                 if (infos.size() == limit)
                     break;
@@ -271,4 +388,43 @@ public class VisorTxTask extends 
VisorMultiNodeTask<VisorTxTaskArg, Map<ClusterN
             return Long.compare(o2.getSize(), o1.getSize());
         }
     }
+
+    /** Type shortcut. */
+    private interface TxKillClosure extends
+        IgniteBiClosure<IgniteInternalTx, IgniteTxManager, 
IgniteInternalFuture<IgniteInternalTx>> {
+    }
+
+    /** Kills near or local tx. */
+    private static class NearKillClosure implements TxKillClosure {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public IgniteInternalFuture<IgniteInternalTx> 
apply(IgniteInternalTx tx, IgniteTxManager tm) {
+            return tx.isRollbackOnly() || tx.state() == COMMITTING || 
tx.state() == COMMITTED ?
+                new GridFinishedFuture<>() : tx.rollbackAsync();
+        }
+    }
+
+    /** Kills remote tx. */
+    private static class RemoteKillClosure implements TxKillClosure {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** {@inheritDoc} */
+        @Override public IgniteInternalFuture<IgniteInternalTx> 
apply(IgniteInternalTx tx, IgniteTxManager tm) {
+            IgniteTxRemoteEx remote = (IgniteTxRemoteEx)tx;
+
+            if (tx.isRollbackOnly() || tx.state() == COMMITTING || tx.state() 
== COMMITTED)
+                return new GridFinishedFuture<>();
+
+            if (tx.state() == TransactionState.PREPARED)
+                remote.doneRemote(tx.xidVersion(),
+                    Collections.<GridCacheVersion>emptyList(),
+                    Collections.<GridCacheVersion>emptyList(),
+                    Collections.<GridCacheVersion>emptyList());
+
+            return tx.rollbackAsync();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d305c663/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
index 5d12d9a..b36bf16 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/TestRecordingCommunicationSpi.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -29,13 +30,16 @@ import org.apache.ignite.IgniteException;
 import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoMessage;
+import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.spi.IgniteSpiException;
 import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
+import org.jetbrains.annotations.Nullable;
 
 import static 
org.apache.ignite.internal.IgniteNodeAttributes.ATTR_IGNITE_INSTANCE_NAME;
 
@@ -199,6 +203,18 @@ public class TestRecordingCommunicationSpi extends 
TcpCommunicationSpi {
     }
 
     /**
+     * @param cnt Number of messages to wait.
+     *
+     * @throws InterruptedException If interrupted.
+     */
+    public void waitForBlocked(int cnt) throws InterruptedException {
+        synchronized (this) {
+            while (blockedMsgs.size() < cnt)
+                wait();
+        }
+    }
+
+    /**
      * @throws InterruptedException If interrupted.
      */
     public void waitForRecorded() throws InterruptedException {
@@ -251,26 +267,38 @@ public class TestRecordingCommunicationSpi extends 
TcpCommunicationSpi {
     }
 
     /**
-     * Stops block messages and can sends all already blocked messages.
+     * Stops block messages and sends all already blocked messages.
      */
     public void stopBlock() {
-        stopBlock(true);
+        stopBlock(true, null);
     }
 
     /**
      * Stops block messages and sends all already blocked messages if sndMsgs 
is 'true'.
      *
-     * @param sndMsgs If {@code true} sends blocked messages.
+     * @param sndMsgs {@code True} to send blocked messages.
      */
     public void stopBlock(boolean sndMsgs) {
-        synchronized (this) {
-            blockP = null;
+        stopBlock(sndMsgs, null);
+    }
 
+    /**
+     * Stops block messages and sends all already blocked messages if sndMsgs 
is 'true' optionally filtered
+     * by unblockPred.
+     *
+     * @param sndMsgs If {@code true} sends blocked messages.
+     * @param unblockPred If not null unblocks only messages allowed by 
predicate.
+     */
+    public void stopBlock(boolean sndMsgs, @Nullable 
IgnitePredicate<T2<ClusterNode, GridIoMessage>> unblockPred) {
+        synchronized (this) {
             blockCls.clear();
             blockP = null;
 
+            Collection<T2<ClusterNode, GridIoMessage>> msgs =
+                unblockPred == null ? blockedMsgs : F.view(blockedMsgs, 
unblockPred);
+
             if (sndMsgs) {
-                for (T2<ClusterNode, GridIoMessage> msg : blockedMsgs) {
+                for (T2<ClusterNode, GridIoMessage> msg : msgs) {
                     try {
                         ignite.log().info("Send blocked message [node=" + 
msg.get1().id() +
                             ", order=" + msg.get1().order() +
@@ -284,7 +312,7 @@ public class TestRecordingCommunicationSpi extends 
TcpCommunicationSpi {
                 }
             }
 
-            blockedMsgs.clear();
+            msgs.clear();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/d305c663/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java 
b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
index 62f1518..5dd9b2b 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/util/GridCommandHandlerTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.CountDownLatch;
@@ -31,7 +32,10 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.List;
+import java.util.concurrent.atomic.LongAdder;
+import javax.cache.processor.EntryProcessor;
+import javax.cache.processor.EntryProcessorException;
+import javax.cache.processor.MutableEntry;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.IgniteAtomicSequence;
 import org.apache.ignite.IgniteCache;
@@ -46,26 +50,45 @@ import org.apache.ignite.configuration.IgniteConfiguration;
 import org.apache.ignite.configuration.WALMode;
 import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInternalFuture;
+import org.apache.ignite.internal.TestRecordingCommunicationSpi;
 import org.apache.ignite.internal.commandline.CommandHandler;
 import org.apache.ignite.internal.commandline.cache.CacheCommand;
+import org.apache.ignite.internal.managers.communication.GridIoMessage;
 import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
 import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheContext;
+import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
 import org.apache.ignite.internal.processors.cache.GridCacheFuture;
+import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
+import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
+import 
org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTxFinishRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxFinishRequest;
+import 
org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
+import 
org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.cache.transactions.IgniteTxEntry;
+import 
org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
 import org.apache.ignite.internal.processors.cache.GridCacheOperation;
 import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
 import 
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.lang.GridAbsPredicate;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.G;
+import org.apache.ignite.internal.util.typedef.T2;
 import org.apache.ignite.internal.util.typedef.X;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.internal.visor.tx.VisorTxInfo;
 import org.apache.ignite.internal.visor.tx.VisorTxTaskResult;
+import org.apache.ignite.lang.IgniteBiPredicate;
 import org.apache.ignite.lang.IgniteInClosure;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.transactions.Transaction;
 import org.apache.ignite.transactions.TransactionRollbackException;
+import org.apache.ignite.transactions.TransactionTimeoutException;
 
 import static 
org.apache.ignite.IgniteSystemProperties.IGNITE_ENABLE_EXPERIMENTAL_COMMAND;
 import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL;
@@ -137,10 +160,12 @@ public class GridCommandHandlerTest extends 
GridCommonAbstractTest {
     @Override protected IgniteConfiguration getConfiguration(String 
igniteInstanceName) throws Exception {
         IgniteConfiguration cfg = super.getConfiguration(igniteInstanceName);
 
+        cfg.setCommunicationSpi(new TestRecordingCommunicationSpi());
+
         cfg.setConnectorConfiguration(new ConnectorConfiguration());
 
         DataStorageConfiguration memCfg = new 
DataStorageConfiguration().setDefaultDataRegionConfiguration(
-            new DataRegionConfiguration().setMaxSize(100L * 1024 * 1024));
+            new DataRegionConfiguration().setMaxSize(50L * 1024 * 1024));
 
         cfg.setDataStorageConfiguration(memCfg);
 
@@ -150,7 +175,7 @@ public class GridCommandHandlerTest extends 
GridCommonAbstractTest {
 
         cfg.setConsistentId(igniteInstanceName);
 
-        cfg.setClientMode("client".equals(igniteInstanceName));
+        cfg.setClientMode(igniteInstanceName.startsWith("client"));
 
         return cfg;
     }
@@ -469,26 +494,22 @@ public class GridCommandHandlerTest extends 
GridCommonAbstractTest {
 
             assertNotNull(res);
 
-            for (VisorTxInfo txInfo : res.getInfos()) {
+            for (VisorTxInfo txInfo : res.getInfos())
                 assertTrue(txInfo.getSize() >= minSize);
-
-            }
         }, "--tx", "minSize", Integer.toString(minSize));
 
         // test order by size.
         validate(h, map -> {
             VisorTxTaskResult res = map.get(grid(0).localNode());
 
-            assertTrue(res.getInfos().get(0).getSize() >= 
res.getInfos().get(1).getSize());
-
+            assertTrue(res.getInfos().get(0).getSize() >=  
res.getInfos().get(1).getSize());
         }, "--tx", "order", "SIZE");
 
         // test order by duration.
         validate(h, map -> {
             VisorTxTaskResult res = map.get(grid(0).localNode());
 
-            assertTrue(res.getInfos().get(0).getDuration() >= 
res.getInfos().get(1).getDuration());
-
+            assertTrue(res.getInfos().get(0).getDuration() >=  
res.getInfos().get(1).getDuration());
         }, "--tx", "order", "DURATION");
 
         // test order by start_time.
@@ -538,6 +559,251 @@ public class GridCommandHandlerTest extends 
GridCommonAbstractTest {
     }
 
     /**
+     *
+     */
+    public void testKillHangingLocalTransactions() throws Exception {
+        Ignite ignite = startGridsMultiThreaded(2);
+
+        ignite.cluster().active(true);
+
+        Ignite client = startGrid("client");
+
+        client.getOrCreateCache(new CacheConfiguration<>(DEFAULT_CACHE_NAME).
+            setAtomicityMode(TRANSACTIONAL).
+            setWriteSynchronizationMode(FULL_SYNC).
+            setAffinity(new RendezvousAffinityFunction(false, 64)));
+
+        Ignite prim = primaryNode(0L, DEFAULT_CACHE_NAME);
+
+        // Blocks lock response to near node.
+        
TestRecordingCommunicationSpi.spi(prim).blockMessages(GridNearLockResponse.class,
 client.name());
+
+        
TestRecordingCommunicationSpi.spi(client).blockMessages(GridNearTxFinishRequest.class,
 prim.name());
+
+        GridNearTxLocal clientTx = null;
+
+        try(Transaction tx = client.transactions().txStart(PESSIMISTIC, 
READ_COMMITTED, 2000, 1)) {
+            clientTx = ((TransactionProxyImpl)tx).tx();
+
+            client.cache(DEFAULT_CACHE_NAME).put(0L, 0L);
+
+            fail();
+        }
+        catch (Exception e) {
+            assertTrue(X.hasCause(e, TransactionTimeoutException.class));
+        }
+
+        assertNotNull(clientTx);
+
+        IgniteEx primEx = (IgniteEx)prim;
+
+        IgniteInternalTx tx0 = 
primEx.context().cache().context().tm().activeTransactions().iterator().next();
+
+        assertNotNull(tx0);
+
+        CommandHandler h = new CommandHandler();
+
+        validate(h, map -> {
+            ClusterNode node = grid(0).cluster().localNode();
+
+            VisorTxTaskResult res = map.get(node);
+
+            for (VisorTxInfo info : res.getInfos())
+                assertEquals(tx0.xid(), info.getXid());
+
+            assertEquals(1, map.size());
+        }, "--tx", "kill");
+
+        tx0.finishFuture().get();
+
+        TestRecordingCommunicationSpi.spi(prim).stopBlock();
+
+        TestRecordingCommunicationSpi.spi(client).stopBlock();
+
+        IgniteInternalFuture<?> nearFinFut = U.field(clientTx, "finishFut");
+
+        nearFinFut.get();
+
+        checkFutures();
+    }
+
+    /**
+     * Simulate uncommitted backup transactions and test rolling back using 
utility.
+     */
+    public void testKillHangingRemoteTransactions() throws Exception {
+        final int cnt = 3;
+
+        startGridsMultiThreaded(cnt);
+
+        Ignite[] clients = new Ignite[] {
+            startGrid("client1"),
+            startGrid("client2"),
+            startGrid("client3"),
+            startGrid("client4")
+        };
+
+        clients[0].getOrCreateCache(new 
CacheConfiguration<>(DEFAULT_CACHE_NAME).
+            setBackups(2).
+            setAtomicityMode(TRANSACTIONAL).
+            setWriteSynchronizationMode(FULL_SYNC).
+            setAffinity(new RendezvousAffinityFunction(false, 64)));
+
+        for (Ignite client : clients) {
+            assertTrue(client.configuration().isClientMode());
+
+            assertNotNull(client.cache(DEFAULT_CACHE_NAME));
+        }
+
+        LongAdder progress = new LongAdder();
+
+        AtomicInteger idx = new AtomicInteger();
+
+        int tc = clients.length;
+
+        CountDownLatch lockLatch = new CountDownLatch(1);
+        CountDownLatch commitLatch = new CountDownLatch(1);
+
+        Ignite prim = primaryNode(0L, DEFAULT_CACHE_NAME);
+
+        TestRecordingCommunicationSpi primSpi = 
TestRecordingCommunicationSpi.spi(prim);
+
+        primSpi.blockMessages(new IgniteBiPredicate<ClusterNode, Message>() {
+            @Override public boolean apply(ClusterNode node, Message message) {
+                return message instanceof GridDhtTxFinishRequest;
+            }
+        });
+
+        IgniteInternalFuture<?> fut = multithreadedAsync(new Runnable() {
+            @Override public void run() {
+                int id = idx.getAndIncrement();
+
+                Ignite client = clients[id];
+
+                try (Transaction tx = 
client.transactions().txStart(PESSIMISTIC, READ_COMMITTED, 0, 1)) {
+                    IgniteCache<Long, Long> cache = 
client.cache(DEFAULT_CACHE_NAME);
+
+                    if (id != 0)
+                        U.awaitQuiet(lockLatch);
+
+                    cache.invoke(0L, new IncrementClosure(), null);
+
+                    if (id == 0) {
+                        lockLatch.countDown();
+
+                        U.awaitQuiet(commitLatch);
+
+                        doSleep(500); // Wait until candidates will enqueue.
+                    }
+
+                    tx.commit();
+                }
+                catch (Exception e) {
+                    assertTrue(X.hasCause(e, 
TransactionTimeoutException.class));
+                }
+
+                progress.increment();
+
+            }
+        }, tc, "invoke-thread");
+
+        U.awaitQuiet(lockLatch);
+
+        commitLatch.countDown();
+
+        primSpi.waitForBlocked(clients.length);
+
+        // Unblock only finish messages from clients from 2 to 4.
+        primSpi.stopBlock(true, new 
IgnitePredicate<T2<ClusterNode,GridIoMessage>>() {
+            @Override public boolean apply(T2<ClusterNode, GridIoMessage> 
objects) {
+                GridIoMessage iom = objects.get2();
+
+                Message m = iom.message();
+
+                if (m instanceof GridDhtTxFinishRequest) {
+                    GridDhtTxFinishRequest r = (GridDhtTxFinishRequest)m;
+
+                    if 
(r.nearNodeId().equals(clients[0].cluster().localNode().id()))
+                        return false;
+                }
+
+                return true;
+            }
+        });
+
+        // Wait until queue is stable
+        for (Ignite ignite : G.allGrids()) {
+            if (ignite.configuration().isClientMode())
+                continue;
+
+            Collection<IgniteInternalTx> txs = 
((IgniteEx)ignite).context().cache().context().tm().activeTransactions();
+
+            GridTestUtils.waitForCondition(new GridAbsPredicate() {
+                @Override public boolean apply() {
+                    for (IgniteInternalTx tx : txs)
+                        if (!tx.local()) {
+                            IgniteTxEntry entry = 
tx.writeEntries().iterator().next();
+
+                            GridCacheEntryEx cached = entry.cached();
+
+                            Collection<GridCacheMvccCandidate> candidates = 
cached.remoteMvccSnapshot();
+
+                            if (candidates.size() != clients.length)
+                                return false;
+                        }
+
+                    return true;
+                }
+            }, 10_000);
+        }
+
+        CommandHandler h = new CommandHandler();
+
+        // Check listing.
+        validate(h, map -> {
+            for (int i = 0; i < cnt; i++) {
+                IgniteEx grid = grid(i);
+
+                // Skip primary.
+                if 
(grid.localNode().id().equals(prim.cluster().localNode().id()))
+                    continue;
+
+                VisorTxTaskResult res = map.get(grid.localNode());
+
+                // Validate queue length on backups.
+                assertEquals(clients.length, res.getInfos().size());
+            }
+        }, "--tx");
+
+        // Check kill.
+        validate(h, map -> {
+            // No-op.
+        }, "--tx", "kill");
+
+        // Wait for all remote txs to finish.
+        for (Ignite ignite : G.allGrids()) {
+            if (ignite.configuration().isClientMode())
+                continue;
+
+            Collection<IgniteInternalTx> txs = 
((IgniteEx)ignite).context().cache().context().tm().activeTransactions();
+
+            for (IgniteInternalTx tx : txs)
+                if (!tx.local())
+                    tx.finishFuture().get();
+        }
+
+        // Unblock finish message from client1.
+        primSpi.stopBlock(true);
+
+        fut.get();
+
+        Long cur = (Long)clients[0].cache(DEFAULT_CACHE_NAME).get(0L);
+
+        assertEquals(tc - 1, cur.longValue());
+
+        checkFutures();
+    }
+
+    /**
      * Test baseline add items works via control.sh
      *
      * @throws Exception If failed.
@@ -1006,6 +1272,23 @@ public class GridCommandHandlerTest extends 
GridCommonAbstractTest {
                 log.info("Waiting for future: " + fut);
 
             assertTrue("Expecting no active futures: node=" + 
ig.localNode().id(), futs.isEmpty());
+
+            Collection<IgniteInternalTx> txs = 
ig.context().cache().context().tm().activeTransactions();
+
+            for (IgniteInternalTx tx : txs)
+                log.info("Waiting for tx: " + tx);
+
+            assertTrue("Expecting no active transactions: node=" + 
ig.localNode().id(), txs.isEmpty());
+        }
+    }
+
+    /** */
+    private static class IncrementClosure implements EntryProcessor<Long, 
Long, Void> {
+        /** {@inheritDoc} */
+        @Override public Void process(MutableEntry<Long, Long> entry, 
Object... arguments) throws EntryProcessorException {
+            entry.setValue(entry.exists() ? entry.getValue() + 1 : 0);
+
+            return null;
         }
     }
 

Reply via email to