This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new af0f0a7  IGNITE-13429 Integration test of control.sh transactions' 
management (#8239)
af0f0a7 is described below

commit af0f0a7b13c3e9b78900c818bc9ca4040d927953
Author: Ivan Daschinskiy <ivanda...@gmail.com>
AuthorDate: Tue Sep 15 17:26:18 2020 +0300

    IGNITE-13429 Integration test of control.sh transactions' management (#8239)
---
 .../LongRunningTransactionsGenerator.java          | 157 +++++++++++++++
 .../ducktest/utils/IgniteAwareApplication.java     |   6 +-
 .../tests/ignitetest/services/ignite_app.py        |  15 +-
 .../ignitetest/services/utils/control_utility.py   | 221 +++++++++++++++++++--
 .../fast_suite.yml => control_utility/__init__.py} |  17 +-
 .../baseline_test.py}                              |  12 +-
 .../ignitetest/tests/control_utility/tx_test.py    | 192 ++++++++++++++++++
 .../tests/ignitetest/tests/suites/fast_suite.yml   |   2 +-
 modules/ducktests/tests/setup.py                   |   2 +-
 modules/ducktests/tests/tox.ini                    |   1 +
 10 files changed, 582 insertions(+), 43 deletions(-)

diff --git 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/control_utility/LongRunningTransactionsGenerator.java
 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/control_utility/LongRunningTransactionsGenerator.java
new file mode 100644
index 0000000..3bbf732c
--- /dev/null
+++ 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/control_utility/LongRunningTransactionsGenerator.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.control_utility;
+
+import java.time.Duration;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
+import javax.cache.CacheException;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.transactions.Transaction;
+import org.apache.ignite.transactions.TransactionRollbackException;
+
+/**
+ * Run long running transactions on node with specified param.
+ */
+public class LongRunningTransactionsGenerator extends IgniteAwareApplication {
+    /** */
+    private static final Duration TOPOLOGY_WAIT_TIMEOUT = 
Duration.ofSeconds(60);
+
+    /** */
+    private static final String KEYS_LOCKED_MESSAGE = 
"APPLICATION_KEYS_LOCKED";
+
+    /** */
+    private static final String LOCKED_KEY_PREFIX = "KEY_";
+
+    /** */
+    private volatile Executor pool;
+
+    /** {@inheritDoc} */
+    @Override protected void run(JsonNode jsonNode) throws Exception {
+        IgniteCache<String, String> cache = 
ignite.cache(jsonNode.get("cache_name").asText());
+
+        int txCount = jsonNode.get("tx_count") != null ? 
jsonNode.get("tx_count").asInt() : 1;
+
+        int txSize = jsonNode.get("tx_size") != null ? 
jsonNode.get("tx_size").asInt() : 1;
+
+        String keyPrefix = jsonNode.get("key_prefix") != null ? 
jsonNode.get("key_prefix").asText() : LOCKED_KEY_PREFIX;
+
+        String label = jsonNode.get("label") != null ? 
jsonNode.get("label").asText() : null;
+
+        long expectedTopologyVersion = 
jsonNode.get("wait_for_topology_version") != null ?
+            jsonNode.get("wait_for_topology_version").asLong() : -1L;
+
+        CountDownLatch lockLatch = new CountDownLatch(txCount);
+
+        pool = Executors.newFixedThreadPool(2 * txCount);
+
+        markInitialized();
+
+        if (expectedTopologyVersion > 0) {
+            log.info("Start waiting for topology version: " + 
expectedTopologyVersion + ", " +
+                "current version is: " + ignite.cluster().topologyVersion());
+
+            long start = System.nanoTime();
+
+            while (ignite.cluster().topologyVersion() < expectedTopologyVersion
+                && Duration.ofNanos(start - 
System.nanoTime()).compareTo(TOPOLOGY_WAIT_TIMEOUT) < 0)
+                Thread.sleep(100L);
+
+            log.info("Finished waiting for topology version: " + 
expectedTopologyVersion + ", " +
+                "current version is: " + ignite.cluster().topologyVersion());
+        }
+
+        for (int i = 0; i < txCount; i++) {
+            String key = keyPrefix + i;
+
+            pool.execute(() -> {
+                Lock lock = cache.lock(key);
+
+                lock.lock();
+
+                try {
+                    lockLatch.countDown();
+
+                    while (!terminated())
+                        Thread.sleep(100L);
+                }
+                catch (InterruptedException e) {
+                    markBroken(new RuntimeException("Unexpected thread 
interruption", e));
+
+                    Thread.currentThread().interrupt();
+                }
+                finally {
+                    lock.unlock();
+                }
+            });
+        }
+
+        lockLatch.await();
+
+        log.info(KEYS_LOCKED_MESSAGE);
+
+        CountDownLatch txLatch = new CountDownLatch(txCount);
+
+        for (int i = 0; i < txCount; i++) {
+            Map<String, String> data = new TreeMap<>();
+
+            for (int j = 0; j < txSize; j++) {
+                String key = keyPrefix + (j == 0 ? String.valueOf(i) : i + "_" 
+ j);
+
+                data.put(key, key);
+            }
+
+            IgniteTransactions igniteTransactions = label != null ? 
ignite.transactions().withLabel(label) :
+                ignite.transactions();
+
+            pool.execute(() -> {
+                IgniteUuid xid = null;
+
+                try (Transaction tx = igniteTransactions.txStart()) {
+                    xid = tx.xid();
+
+                    cache.putAll(data);
+
+                    tx.commit();
+                }
+                catch (Exception e) {
+                    if (e instanceof CacheException && e.getCause() != null &&
+                        e.getCause() instanceof TransactionRollbackException)
+                        recordResult("TX_ID", xid != null ? xid.toString() : 
"");
+                    else
+                        markBroken(new RuntimeException("Transaction is rolled 
back with unexpected error", e));
+                }
+                finally {
+                    txLatch.countDown();
+                }
+            });
+        }
+
+        txLatch.await();
+
+        markFinished();
+    }
+}
diff --git 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
index a74b53f..35760b3 100644
--- 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
+++ 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
@@ -144,18 +144,18 @@ public abstract class IgniteAwareApplication {
     /**
      *
      */
-    protected void markBroken(Throwable th) {
+    public void markBroken(Throwable th) {
         log.info("Marking as broken.");
 
         synchronized (stateMux) {
+            recordResult("ERROR", th.toString());
+
             if (broken) {
                 log.info("Already marked as broken.");
 
                 return;
             }
 
-            recordResult("ERROR", th.toString());
-
             assert !finished;
 
             log.error(APP_BROKEN);
diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py 
b/modules/ducktests/tests/ignitetest/services/ignite_app.py
index 9e396f2..74b7fba 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite_app.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py
@@ -107,12 +107,23 @@ class IgniteApplicationService(IgniteAwareService):
         :param name: Result parameter's name.
         :return: Extracted result of application run.
         """
-        res = ""
+        results = self.extract_results(name)
+
+        assert len(results) <= 1, f"Expected exactly one result occurence, 
{len(results)} found."
+
+        return results[0] if results else ""
+
+    def extract_results(self, name):
+        """
+        :param name: Results parameter's name.
+        :return: Extracted results of application run.
+        """
+        res = []
 
         output = self.nodes[0].account.ssh_capture(
             "grep '%s' %s" % (name + "->", self.STDOUT_STDERR_CAPTURE), 
allow_fail=False)
 
         for line in output:
-            res = re.search("%s(.*)%s" % (name + "->", "<-"), line).group(1)
+            res.append(re.search("%s(.*)%s" % (name + "->", "<-"), 
line).group(1))
 
         return res
diff --git 
a/modules/ducktests/tests/ignitetest/services/utils/control_utility.py 
b/modules/ducktests/tests/ignitetest/services/utils/control_utility.py
index 4b194e6..5e65bca 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/control_utility.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/control_utility.py
@@ -16,9 +16,11 @@
 """
 This module contains control utility wrapper.
 """
+
 import random
 import re
-from collections import namedtuple
+import time
+from typing import NamedTuple
 
 from ducktape.cluster.remoteaccount import RemoteCommandError
 
@@ -52,10 +54,10 @@ class ControlUtility:
         :param baseline: Baseline nodes or topology version to set as baseline.
         """
         if isinstance(baseline, int):
-            result = self.__run("--baseline version %d --yes" % baseline)
+            result = self.__run(f"--baseline version {baseline} --yes")
         else:
-            result = self.__run("--baseline set %s --yes" %
-                                ",".join([node.account.externally_routable_ip 
for node in baseline]))
+            result = self.__run(
+                f"--baseline set 
{','.join([node.account.externally_routable_ip for node in baseline])} --yes")
 
         return self.__parse_cluster_state(result)
 
@@ -63,8 +65,8 @@ class ControlUtility:
         """
         :param nodes: Nodes that should be added to baseline.
         """
-        result = self.__run("--baseline add %s --yes" %
-                            ",".join([node.account.externally_routable_ip for 
node in nodes]))
+        result = self.__run(
+            f"--baseline add {','.join([node.account.externally_routable_ip 
for node in nodes])} --yes")
 
         return self.__parse_cluster_state(result)
 
@@ -72,8 +74,8 @@ class ControlUtility:
         """
         :param nodes: Nodes that should be removed to baseline.
         """
-        result = self.__run("--baseline remove %s --yes" %
-                            ",".join([node.account.externally_routable_ip for 
node in nodes]))
+        result = self.__run(
+            f"--baseline remove {','.join([node.account.externally_routable_ip 
for node in nodes])} --yes")
 
         return self.__parse_cluster_state(result)
 
@@ -88,8 +90,8 @@ class ControlUtility:
         Enable baseline auto adjust.
         :param timeout: Auto adjust timeout in millis.
         """
-        timeout_str = "timeout %d" % timeout if timeout else ""
-        return self.__run("--baseline auto_adjust enable %s --yes" % 
timeout_str)
+        timeout_str = f"timeout {timeout}" if timeout else ""
+        return self.__run(f"--baseline auto_adjust enable {timeout_str} --yes")
 
     def activate(self):
         """
@@ -103,6 +105,124 @@ class ControlUtility:
         """
         return self.__run("--deactivate --yes")
 
+    def tx(self, **kwargs):
+        """
+        Get list of transactions, various filters can be applied.
+        """
+        output = self.__run(self.__tx_command(**kwargs))
+        res = self.__parse_tx_list(output)
+        return res if res else output
+
+    def tx_info(self, xid):
+        """
+        Get verbose transaction info by xid.
+        """
+        return self.__parse_tx_info(self.__run(f"--tx --info {xid}"))
+
+    def tx_kill(self, **kwargs):
+        """
+        Kill transaction by xid or by various filter.
+        """
+        output = self.__run(self.__tx_command(kill=True, **kwargs))
+        res = self.__parse_tx_list(output)
+        return res if res else output
+
+    @staticmethod
+    def __tx_command(**kwargs):
+        tokens = ["--tx"]
+
+        if 'xid' in kwargs:
+            tokens.append(f"--xid {kwargs['xid']}")
+
+        if kwargs.get('clients'):
+            tokens.append("--clients")
+
+        if kwargs.get('servers'):
+            tokens.append("--servers")
+
+        if 'min_duration' in kwargs:
+            tokens.append(f"--min-duration {kwargs.get('min_duration')}")
+
+        if 'min_size' in kwargs:
+            tokens.append(f"--min-size {kwargs.get('min_size')}")
+
+        if 'label_pattern' in kwargs:
+            tokens.append(f"--label {kwargs['label_pattern']}")
+
+        if kwargs.get("nodes"):
+            tokens.append(f"--nodes {','.join(kwargs.get('nodes'))}")
+
+        if 'limit' in kwargs:
+            tokens.append(f"--limit {kwargs['limit']}")
+
+        if 'order' in kwargs:
+            tokens.append(f"--order {kwargs['order']}")
+
+        if kwargs.get('kill'):
+            tokens.append("--kill --yes")
+
+        return " ".join(tokens)
+
+    @staticmethod
+    def __parse_tx_info(output):
+        tx_info_pattern = re.compile(
+            "Near XID version: (?P<xid_full>GridCacheVersion \\[topVer=\\d+, 
order=\\d+, nodeOrder=\\d+\\])\\n\\s+"
+            "Near XID version \\(UUID\\): (?P<xid>[^\\s]+)\\n\\s+"
+            "Isolation: (?P<isolation>[^\\s]+)\\n\\s+"
+            "Concurrency: (?P<concurrency>[^\\s]+)\\n\\s+"
+            "Timeout: (?P<timeout>\\d+)\\n\\s+"
+            "Initiator node: (?P<initiator_id>[^\\s]+)\\n\\s+"
+            "Initiator node \\(consistent ID\\): 
(?P<initiator_consistent_id>[^\\s+]+)\\n\\s+"
+            "Label: (?P<label>[^\\s]+)\\n\\s+Topology version: 
AffinityTopologyVersion "
+            "\\[topVer=(?P<top_ver>\\d+), 
minorTopVer=(?P<minor_top_ver>\\d+)\\]\\n\\s+"
+            "Used caches \\(ID to name\\): {(?P<caches>.*)}\\n\\s+"
+            "Used cache groups \\(ID to name\\): {(?P<cache_groups>.*)}\\n\\s+"
+            "States across the cluster: \\[(?P<states>.*)\\]"
+        )
+
+        match = tx_info_pattern.search(output)
+
+        str_fields = ['xid', 'xid_full', 'label', 'timeout', 'isolation', 
'concurrency', 'initiator_id',
+                      'initiator_consistent_id']
+        dict_fields = ['caches', 'cache_groups']
+
+        if match:
+            kwargs = {v: match.group(v) for v in str_fields}
+            kwargs['timeout'] = int(match.group('timeout'))
+            kwargs.update({v: parse_dict(match.group(v)) for v in dict_fields})
+            kwargs['top_ver'] = (int(match.group('top_ver')), 
int(match.group('minor_top_ver')))
+            kwargs['states'] = parse_list(match.group('states'))
+
+            return TxVerboseInfo(**kwargs)
+
+        return None
+
+    @staticmethod
+    def __parse_tx_list(output):
+        tx_pattern = re.compile(
+            "Tx: \\[xid=(?P<xid>[^\\s]+), "
+            "label=(?P<label>[^\\s]+), state=(?P<state>[^\\s]+), "
+            "startTime=(?P<start_time>\\d{4}-\\d{2}-\\d{2} 
\\d{2}:\\d{2}:\\d{2}.\\d{3}), duration=(?P<duration>\\d+), "
+            "isolation=(?P<isolation>[^\\s]+), 
concurrency=(?P<concurrency>[^\\s]+), "
+            "topVer=AffinityTopologyVersion \\[topVer=(?P<top_ver>\\d+), 
minorTopVer=(?P<minor_top_ver>\\d+)\\], "
+            "timeout=(?P<timeout>\\d+), size=(?P<size>\\d+), 
dhtNodes=\\[(?P<dht_nodes>.*)\\], "
+            "nearXid=(?P<near_xid>[^\\s]+), 
parentNodeIds=\\[(?P<parent_nodes>.*)\\]\\]")
+
+        str_fields = ['xid', 'label', 'state', 'isolation', 'concurrency', 
'near_xid']
+        int_fields = ['timeout', 'size', 'duration']
+        list_fields = ['parent_nodes', 'dht_nodes']
+
+        tx_list = []
+        for match in tx_pattern.finditer(output):
+            kwargs = {v: match.group(v) for v in str_fields}
+            kwargs.update({v: int(match.group(v)) for v in int_fields})
+            kwargs['top_ver'] = (int(match.group('top_ver')), 
int(match.group('minor_top_ver')))
+            kwargs.update({v: parse_list(match.group(v)) for v in list_fields})
+            kwargs['start_time'] = time.strptime(match.group('start_time'), 
"%Y-%m-%d %H:%M:%S.%f")
+            tx_list.append(TxInfo(**kwargs))
+
+        return tx_list
+
     @staticmethod
     def __parse_cluster_state(output):
         state_pattern = re.compile("Cluster state: (?P<cluster_state>[^\\s]+)")
@@ -127,12 +247,12 @@ class ControlUtility:
     def __run(self, cmd):
         node = random.choice(self.__alives())
 
-        self.logger.debug("Run command %s on node %s", cmd, node.name)
+        self.logger.debug(f"Run command {cmd} on node {node.name}")
 
         raw_output = node.account.ssh_capture(self.__form_cmd(node, cmd), 
allow_fail=True)
         code, output = self.__parse_output(raw_output)
 
-        self.logger.debug("Output of command %s on node %s, exited with code 
%d, is %s", cmd, node.name, code, output)
+        self.logger.debug(f"Output of command {cmd} on node {node.name}, 
exited with code {code}, is {output}")
 
         if code != 0:
             raise ControlUtilityError(node.account, cmd, code, output)
@@ -140,8 +260,7 @@ class ControlUtility:
         return output
 
     def __form_cmd(self, node, cmd):
-        return self._cluster.spec.path.script("%s --host %s %s" %
-                                              (self.BASE_COMMAND, 
node.account.externally_routable_ip, cmd))
+        return self._cluster.spec.path.script(f"{self.BASE_COMMAND} --host 
{node.account.externally_routable_ip} {cmd}")
 
     @staticmethod
     def __parse_output(raw_output):
@@ -159,8 +278,59 @@ class ControlUtility:
         return [node for node in self._cluster.nodes if 
self._cluster.alive(node)]
 
 
-BaselineNode = namedtuple("BaselineNode", ["consistent_id", "state", "order"])
-ClusterState = namedtuple("ClusterState", ["state", "topology_version", 
"baseline"])
+class BaselineNode(NamedTuple):
+    """
+    Baseline node info.
+    """
+    consistent_id: str
+    state: str
+    order: int
+
+
+class ClusterState(NamedTuple):
+    """
+    Cluster state info.
+    """
+    state: str
+    topology_version: int
+    baseline: list
+
+
+class TxInfo(NamedTuple):
+    """
+    Transaction info.
+    """
+    xid: str
+    near_xid: str
+    label: str
+    state: str
+    start_time: time.struct_time
+    duration: int
+    isolation: str
+    concurrency: str
+    top_ver: tuple
+    timeout: int
+    size: int
+    dht_nodes: list = []
+    parent_nodes: list = []
+
+
+class TxVerboseInfo(NamedTuple):
+    """
+    Transaction info returned with --info
+    """
+    xid: str
+    xid_full: str
+    label: str
+    isolation: str
+    concurrency: str
+    timeout: int
+    top_ver: tuple
+    initiator_id: str
+    initiator_consistent_id: str
+    caches: dict
+    cache_groups: dict
+    states: list
 
 
 class ControlUtilityError(RemoteCommandError):
@@ -169,3 +339,22 @@ class ControlUtilityError(RemoteCommandError):
     """
     def __init__(self, account, cmd, exit_status, output):
         super().__init__(account, cmd, exit_status, "".join(output))
+
+
+def parse_dict(raw):
+    """
+    Parse java Map.toString() to python dict.
+    """
+    res = {}
+    for token in raw.split(','):
+        key, value = tuple(token.strip().split('='))
+        res[key] = value
+
+    return res
+
+
+def parse_list(raw):
+    """
+    Parse java List.toString() to python list
+    """
+    return [token.strip() for token in raw.split(',')]
diff --git a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml 
b/modules/ducktests/tests/ignitetest/tests/control_utility/__init__.py
similarity index 77%
copy from modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
copy to modules/ducktests/tests/ignitetest/tests/control_utility/__init__.py
index 4811574..1540a34 100644
--- a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
+++ b/modules/ducktests/tests/ignitetest/tests/control_utility/__init__.py
@@ -13,17 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-smoke:
-  - ../smoke_test.py
-
-control_utility:
-  - ../control_utility_test.py
-
-pme_free_switch:
-  - ../pme_free_switch_test.py
-
-cellular_affinity:
-  - ../cellular_affinity_test.py
-
-rebalance:
-  - ../add_node_rebalance_test.py
+"""
+This package contains control.sh utility tests.
+"""
diff --git a/modules/ducktests/tests/ignitetest/tests/control_utility_test.py 
b/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
similarity index 95%
rename from modules/ducktests/tests/ignitetest/tests/control_utility_test.py
rename to 
modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
index a30a381..a1de09f 100644
--- a/modules/ducktests/tests/ignitetest/tests/control_utility_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 """
-This module contains control.sh utility tests.
+This module contains manipulating baseline test through control utility.
 """
 
 from ducktape.mark.resource import cluster
@@ -27,7 +27,7 @@ from 
ignitetest.services.utils.ignite_configuration.data_storage import DataRegi
 from ignitetest.services.utils.ignite_configuration.discovery import 
from_ignite_cluster
 from ignitetest.utils import version_if, ignite_versions
 from ignitetest.utils.ignite_test import IgniteTest
-from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, IgniteVersion, 
LATEST_2_7, V_2_8_0
+from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, IgniteVersion, 
V_2_8_0
 
 
 # pylint: disable=W0223
@@ -38,7 +38,7 @@ class BaselineTests(IgniteTest):
     NUM_NODES = 3
 
     @cluster(num_nodes=NUM_NODES)
-    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8), str(LATEST_2_7))
+    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
     def test_baseline_set(self, ignite_version):
         """
         Test baseline set.
@@ -74,7 +74,7 @@ class BaselineTests(IgniteTest):
         self.__check_nodes_in_baseline(new_node.nodes, baseline)
 
     @cluster(num_nodes=NUM_NODES)
-    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8), str(LATEST_2_7))
+    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
     def test_baseline_add_remove(self, ignite_version):
         """
         Test add and remove nodes from baseline.
@@ -116,7 +116,7 @@ class BaselineTests(IgniteTest):
         self.__check_nodes_not_in_baseline(new_node.nodes, baseline)
 
     @cluster(num_nodes=NUM_NODES)
-    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8), str(LATEST_2_7))
+    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
     def test_activate_deactivate(self, ignite_version):
         """
         Test activate and deactivate cluster.
@@ -139,7 +139,7 @@ class BaselineTests(IgniteTest):
 
     @cluster(num_nodes=NUM_NODES)
     @version_if(lambda version: version >= V_2_8_0)
-    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8), str(LATEST_2_7))
+    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
     def test_baseline_autoadjust(self, ignite_version):
         """
         Test activate and deactivate cluster.
diff --git 
a/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py 
b/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py
new file mode 100644
index 0000000..5aca06e
--- /dev/null
+++ b/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py
@@ -0,0 +1,192 @@
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+This module contains transactions manipulation test through control utility.
+"""
+import random
+
+from ducktape.mark.resource import cluster
+
+from ignitetest.services.ignite import IgniteService
+from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.utils.control_utility import ControlUtility
+from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
+from ignitetest.services.utils.ignite_configuration.cache import 
CacheConfiguration
+from ignitetest.services.utils.ignite_configuration.discovery import 
from_ignite_cluster
+from ignitetest.utils import ignite_versions
+from ignitetest.utils.ignite_test import IgniteTest
+from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, IgniteVersion
+
+
+# pylint: disable=W0223
+class TransactionsTests(IgniteTest):
+    """
+    Tests control.sh transaction management command.
+    """
+    NUM_NODES = 4
+    CACHE_NAME = "TEST"
+
+    @cluster(num_nodes=NUM_NODES)
+    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
+    def test_tx_info(self, ignite_version):
+        """
+        Tests verbose tx info for specific xid.
+        """
+        servers = self.__start_ignite_nodes(ignite_version, self.NUM_NODES - 2)
+
+        long_tx = self.__start_tx_app(ignite_version, servers, 
cache_name=self.CACHE_NAME, tx_count=2, tx_size=2,
+                                      key_prefix='TX_1_KEY_')
+
+        wait_for_key_locked(long_tx)
+
+        control_utility = ControlUtility(servers, self.test_context)
+
+        transactions = control_utility.tx()
+
+        pick_tx = random.choice(transactions)
+
+        res = control_utility.tx_info(pick_tx.xid)
+
+        assert res.xid == pick_tx.xid
+        assert res.timeout == pick_tx.timeout
+        assert res.top_ver == pick_tx.top_ver
+        assert res.label == pick_tx.label
+
+    @cluster(num_nodes=NUM_NODES)
+    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
+    def test_kill_tx(self, ignite_version):
+        """
+        Test kill transactions by xid and filter.
+        """
+        servers = self.__start_ignite_nodes(ignite_version, self.NUM_NODES - 2)
+
+        tx_count = 3
+
+        long_tx_1 = self.__start_tx_app(ignite_version, servers, 
cache_name=self.CACHE_NAME, tx_count=tx_count,
+                                        tx_size=2, key_prefix='TX_1_KEY_', 
label='TX_1', wait_for_topology_version=4)
+
+        long_tx_2 = self.__start_tx_app(ignite_version, servers, 
cache_name=self.CACHE_NAME, tx_count=tx_count,
+                                        tx_size=2, key_prefix='TX_2_KEY_', 
label='TX_2', wait_for_topology_version=4)
+
+        wait_for_key_locked(long_tx_1, long_tx_2)
+
+        control_utility = ControlUtility(servers, self.test_context)
+
+        # check kill with specific xid.
+        transactions = control_utility.tx(label_pattern='TX_1')
+        res = control_utility.tx_kill(xid=random.choice(transactions).xid)
+        assert res and len(res) == 1 and res[0].xid == 
long_tx_1.extract_result("TX_ID")
+
+        # check kill with filter.
+        res = control_utility.tx_kill(label_pattern='TX_2')
+        assert res and len(res) == tx_count and set(map(lambda x: x.xid, res))\
+            .issubset(set(long_tx_2.extract_results("TX_ID")))
+
+    @cluster(num_nodes=NUM_NODES)
+    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
+    def test_tx_filter(self, ignite_version):
+        """
+        Test filtering transactions list.
+        """
+        servers = self.__start_ignite_nodes(ignite_version, self.NUM_NODES - 2)
+
+        client_tx_count, client_tx_size = 5, 4
+        server_tx_count, server_tx_size = 3, 2
+
+        servers = self.__start_tx_app(ignite_version, servers, 
client_mode=False, cache_name=self.CACHE_NAME,
+                                      tx_count=server_tx_count, 
tx_size=server_tx_size, key_prefix='TX_1_KEY_',
+                                      label='LBL_SERVER', 
wait_for_topology_version=4)
+
+        clients = self.__start_tx_app(ignite_version, servers, 
cache_name=self.CACHE_NAME, tx_count=client_tx_count,
+                                      tx_size=client_tx_size, 
key_prefix='TX_2_KEY_', label='LBL_CLIENT',
+                                      wait_for_topology_version=4)
+
+        wait_for_key_locked(clients, servers)
+        control_utility = ControlUtility(servers, self.test_context)
+
+        start_check = self.monotonic()
+        assert len(control_utility.tx(clients=True, label_pattern='LBL_.*')) 
== client_tx_count
+        assert len(control_utility.tx(servers=True, label_pattern='LBL_.*')) 
== server_tx_count
+
+        # limit to 2 transactions on each node, therefore 4 total.
+        assert len(control_utility.tx(limit=2, label_pattern='LBL_.*')) == 4
+
+        assert len(control_utility.tx(label_pattern='LBL_.*')) == 
client_tx_count + server_tx_count
+
+        # filter transactions with keys size greater or equal to min_size.
+        assert len(control_utility.tx(min_size=client_tx_size, 
label_pattern='LBL_.*')) == client_tx_count
+
+        server_nodes = [node.consistent_id for node in servers.nodes]
+        assert len(control_utility.tx(label_pattern='LBL_.*', 
nodes=server_nodes)) == server_tx_count
+
+        # test ordering.
+        for order_type in ['DURATION', 'SIZE', 'START_TIME']:
+            transactions = control_utility.tx(label_pattern='LBL_.*', 
order=order_type)
+            assert is_sorted(transactions, key=lambda x, attr=order_type: 
getattr(x, attr.lower()), reverse=True)
+
+        # test min_duration filtering.
+        min_duration = int(self.monotonic() - start_check)
+        transactions = control_utility.tx(min_duration=min_duration, 
label_pattern='LBL_.*')
+        assert len(transactions) == server_tx_count + client_tx_count
+        for tx in transactions:
+            assert tx.duration >= min_duration
+
+    def __start_tx_app(self, version, servers, *, client_mode=True, **kwargs):
+        app_params = {
+            'config': IgniteConfiguration(version=IgniteVersion(version),
+                                          client_mode=client_mode,
+                                          
discovery_spi=from_ignite_cluster(servers)),
+            'java_class_name': 
'org.apache.ignite.internal.ducktest.tests.control_utility'
+                               '.LongRunningTransactionsGenerator',
+            'params': kwargs
+        }
+
+        app = IgniteApplicationService(self.test_context, **app_params)
+        app.start()
+
+        return app
+
+    def __start_ignite_nodes(self, version, num_nodes, timeout_sec=60):
+        config = IgniteConfiguration(
+            cluster_state="ACTIVE",
+            version=IgniteVersion(version),
+            caches=[CacheConfiguration(name=self.CACHE_NAME, 
atomicity_mode='TRANSACTIONAL')]
+        )
+
+        servers = IgniteService(self.test_context, config=config, 
num_nodes=num_nodes)
+
+        servers.start(timeout_sec=timeout_sec)
+
+        return servers
+
+
+def wait_for_key_locked(*clusters):
+    """
+    Wait for APPLICATION_KEYS_LOCKED on tx_app nodes.
+    """
+    for cluster_ in clusters:
+        cluster_.await_event("APPLICATION_KEYS_LOCKED", timeout_sec=60, 
from_the_beginning=True)
+
+
+def is_sorted(lst, key=lambda x: x, reverse=False):
+    """
+    Check if list is sorted.
+    """
+    for i, elem in enumerate(lst[1:]):
+        return key(elem) <= key(lst[i]) if not reverse else key(elem) >= 
key(lst[i])
+
+    return True
diff --git a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml 
b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
index 4811574..698c1d8 100644
--- a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
+++ b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml
@@ -17,7 +17,7 @@ smoke:
   - ../smoke_test.py
 
 control_utility:
-  - ../control_utility_test.py
+  - ../control_utility
 
 pme_free_switch:
   - ../pme_free_switch_test.py
diff --git a/modules/ducktests/tests/setup.py b/modules/ducktests/tests/setup.py
index 5b89d58..de849ed 100644
--- a/modules/ducktests/tests/setup.py
+++ b/modules/ducktests/tests/setup.py
@@ -30,7 +30,7 @@ setup(name="ignitetest",
       license="apache2.0",
       packages=find_packages(exclude=["ignitetest.tests", 
"ignitetest.tests.*"]),
       include_package_data=True,
-      install_requires=['ducktape==0.8.0'],
+      install_requires=["ducktape==0.8.0", "tox==3.15.2"],
       dependency_links=[
           
'https://github.com/confluentinc/ducktape/tarball/master#egg=ducktape-0.8.0'
       ])
diff --git a/modules/ducktests/tests/tox.ini b/modules/ducktests/tests/tox.ini
index 45107be..7955108 100644
--- a/modules/ducktests/tests/tox.ini
+++ b/modules/ducktests/tests/tox.ini
@@ -33,6 +33,7 @@ commands =
 
 [BASIC]
 min-public-methods=0
+good-names=i,j,k,x,y,ex,pk,tx
 
 [SIMILARITIES]
 ignore-imports=yes

Reply via email to