This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch ignite-ducktape in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push: new af0f0a7 IGNITE-13429 Integration test of control.sh transactions' management (#8239) af0f0a7 is described below commit af0f0a7b13c3e9b78900c818bc9ca4040d927953 Author: Ivan Daschinskiy <ivanda...@gmail.com> AuthorDate: Tue Sep 15 17:26:18 2020 +0300 IGNITE-13429 Integration test of control.sh transactions' management (#8239) --- .../LongRunningTransactionsGenerator.java | 157 +++++++++++++++ .../ducktest/utils/IgniteAwareApplication.java | 6 +- .../tests/ignitetest/services/ignite_app.py | 15 +- .../ignitetest/services/utils/control_utility.py | 221 +++++++++++++++++++-- .../fast_suite.yml => control_utility/__init__.py} | 17 +- .../baseline_test.py} | 12 +- .../ignitetest/tests/control_utility/tx_test.py | 192 ++++++++++++++++++ .../tests/ignitetest/tests/suites/fast_suite.yml | 2 +- modules/ducktests/tests/setup.py | 2 +- modules/ducktests/tests/tox.ini | 1 + 10 files changed, 582 insertions(+), 43 deletions(-) diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/control_utility/LongRunningTransactionsGenerator.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/control_utility/LongRunningTransactionsGenerator.java new file mode 100644 index 0000000..3bbf732c --- /dev/null +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/control_utility/LongRunningTransactionsGenerator.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests.control_utility; + +import java.time.Duration; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.Lock; +import javax.cache.CacheException; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.IgniteTransactions; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; +import org.apache.ignite.lang.IgniteUuid; +import org.apache.ignite.transactions.Transaction; +import org.apache.ignite.transactions.TransactionRollbackException; + +/** + * Run long running transactions on node with specified param. + */ +public class LongRunningTransactionsGenerator extends IgniteAwareApplication { + /** */ + private static final Duration TOPOLOGY_WAIT_TIMEOUT = Duration.ofSeconds(60); + + /** */ + private static final String KEYS_LOCKED_MESSAGE = "APPLICATION_KEYS_LOCKED"; + + /** */ + private static final String LOCKED_KEY_PREFIX = "KEY_"; + + /** */ + private volatile Executor pool; + + /** {@inheritDoc} */ + @Override protected void run(JsonNode jsonNode) throws Exception { + IgniteCache<String, String> cache = ignite.cache(jsonNode.get("cache_name").asText()); + + int txCount = jsonNode.get("tx_count") != null ? jsonNode.get("tx_count").asInt() : 1; + + int txSize = jsonNode.get("tx_size") != null ? jsonNode.get("tx_size").asInt() : 1; + + String keyPrefix = jsonNode.get("key_prefix") != null ? jsonNode.get("key_prefix").asText() : LOCKED_KEY_PREFIX; + + String label = jsonNode.get("label") != null ? jsonNode.get("label").asText() : null; + + long expectedTopologyVersion = jsonNode.get("wait_for_topology_version") != null ? + jsonNode.get("wait_for_topology_version").asLong() : -1L; + + CountDownLatch lockLatch = new CountDownLatch(txCount); + + pool = Executors.newFixedThreadPool(2 * txCount); + + markInitialized(); + + if (expectedTopologyVersion > 0) { + log.info("Start waiting for topology version: " + expectedTopologyVersion + ", " + + "current version is: " + ignite.cluster().topologyVersion()); + + long start = System.nanoTime(); + + while (ignite.cluster().topologyVersion() < expectedTopologyVersion + && Duration.ofNanos(start - System.nanoTime()).compareTo(TOPOLOGY_WAIT_TIMEOUT) < 0) + Thread.sleep(100L); + + log.info("Finished waiting for topology version: " + expectedTopologyVersion + ", " + + "current version is: " + ignite.cluster().topologyVersion()); + } + + for (int i = 0; i < txCount; i++) { + String key = keyPrefix + i; + + pool.execute(() -> { + Lock lock = cache.lock(key); + + lock.lock(); + + try { + lockLatch.countDown(); + + while (!terminated()) + Thread.sleep(100L); + } + catch (InterruptedException e) { + markBroken(new RuntimeException("Unexpected thread interruption", e)); + + Thread.currentThread().interrupt(); + } + finally { + lock.unlock(); + } + }); + } + + lockLatch.await(); + + log.info(KEYS_LOCKED_MESSAGE); + + CountDownLatch txLatch = new CountDownLatch(txCount); + + for (int i = 0; i < txCount; i++) { + Map<String, String> data = new TreeMap<>(); + + for (int j = 0; j < txSize; j++) { + String key = keyPrefix + (j == 0 ? String.valueOf(i) : i + "_" + j); + + data.put(key, key); + } + + IgniteTransactions igniteTransactions = label != null ? ignite.transactions().withLabel(label) : + ignite.transactions(); + + pool.execute(() -> { + IgniteUuid xid = null; + + try (Transaction tx = igniteTransactions.txStart()) { + xid = tx.xid(); + + cache.putAll(data); + + tx.commit(); + } + catch (Exception e) { + if (e instanceof CacheException && e.getCause() != null && + e.getCause() instanceof TransactionRollbackException) + recordResult("TX_ID", xid != null ? xid.toString() : ""); + else + markBroken(new RuntimeException("Transaction is rolled back with unexpected error", e)); + } + finally { + txLatch.countDown(); + } + }); + } + + txLatch.await(); + + markFinished(); + } +} diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java index a74b53f..35760b3 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java @@ -144,18 +144,18 @@ public abstract class IgniteAwareApplication { /** * */ - protected void markBroken(Throwable th) { + public void markBroken(Throwable th) { log.info("Marking as broken."); synchronized (stateMux) { + recordResult("ERROR", th.toString()); + if (broken) { log.info("Already marked as broken."); return; } - recordResult("ERROR", th.toString()); - assert !finished; log.error(APP_BROKEN); diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py b/modules/ducktests/tests/ignitetest/services/ignite_app.py index 9e396f2..74b7fba 100644 --- a/modules/ducktests/tests/ignitetest/services/ignite_app.py +++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py @@ -107,12 +107,23 @@ class IgniteApplicationService(IgniteAwareService): :param name: Result parameter's name. :return: Extracted result of application run. """ - res = "" + results = self.extract_results(name) + + assert len(results) <= 1, f"Expected exactly one result occurence, {len(results)} found." + + return results[0] if results else "" + + def extract_results(self, name): + """ + :param name: Results parameter's name. + :return: Extracted results of application run. + """ + res = [] output = self.nodes[0].account.ssh_capture( "grep '%s' %s" % (name + "->", self.STDOUT_STDERR_CAPTURE), allow_fail=False) for line in output: - res = re.search("%s(.*)%s" % (name + "->", "<-"), line).group(1) + res.append(re.search("%s(.*)%s" % (name + "->", "<-"), line).group(1)) return res diff --git a/modules/ducktests/tests/ignitetest/services/utils/control_utility.py b/modules/ducktests/tests/ignitetest/services/utils/control_utility.py index 4b194e6..5e65bca 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/control_utility.py +++ b/modules/ducktests/tests/ignitetest/services/utils/control_utility.py @@ -16,9 +16,11 @@ """ This module contains control utility wrapper. """ + import random import re -from collections import namedtuple +import time +from typing import NamedTuple from ducktape.cluster.remoteaccount import RemoteCommandError @@ -52,10 +54,10 @@ class ControlUtility: :param baseline: Baseline nodes or topology version to set as baseline. """ if isinstance(baseline, int): - result = self.__run("--baseline version %d --yes" % baseline) + result = self.__run(f"--baseline version {baseline} --yes") else: - result = self.__run("--baseline set %s --yes" % - ",".join([node.account.externally_routable_ip for node in baseline])) + result = self.__run( + f"--baseline set {','.join([node.account.externally_routable_ip for node in baseline])} --yes") return self.__parse_cluster_state(result) @@ -63,8 +65,8 @@ class ControlUtility: """ :param nodes: Nodes that should be added to baseline. """ - result = self.__run("--baseline add %s --yes" % - ",".join([node.account.externally_routable_ip for node in nodes])) + result = self.__run( + f"--baseline add {','.join([node.account.externally_routable_ip for node in nodes])} --yes") return self.__parse_cluster_state(result) @@ -72,8 +74,8 @@ class ControlUtility: """ :param nodes: Nodes that should be removed to baseline. """ - result = self.__run("--baseline remove %s --yes" % - ",".join([node.account.externally_routable_ip for node in nodes])) + result = self.__run( + f"--baseline remove {','.join([node.account.externally_routable_ip for node in nodes])} --yes") return self.__parse_cluster_state(result) @@ -88,8 +90,8 @@ class ControlUtility: Enable baseline auto adjust. :param timeout: Auto adjust timeout in millis. """ - timeout_str = "timeout %d" % timeout if timeout else "" - return self.__run("--baseline auto_adjust enable %s --yes" % timeout_str) + timeout_str = f"timeout {timeout}" if timeout else "" + return self.__run(f"--baseline auto_adjust enable {timeout_str} --yes") def activate(self): """ @@ -103,6 +105,124 @@ class ControlUtility: """ return self.__run("--deactivate --yes") + def tx(self, **kwargs): + """ + Get list of transactions, various filters can be applied. + """ + output = self.__run(self.__tx_command(**kwargs)) + res = self.__parse_tx_list(output) + return res if res else output + + def tx_info(self, xid): + """ + Get verbose transaction info by xid. + """ + return self.__parse_tx_info(self.__run(f"--tx --info {xid}")) + + def tx_kill(self, **kwargs): + """ + Kill transaction by xid or by various filter. + """ + output = self.__run(self.__tx_command(kill=True, **kwargs)) + res = self.__parse_tx_list(output) + return res if res else output + + @staticmethod + def __tx_command(**kwargs): + tokens = ["--tx"] + + if 'xid' in kwargs: + tokens.append(f"--xid {kwargs['xid']}") + + if kwargs.get('clients'): + tokens.append("--clients") + + if kwargs.get('servers'): + tokens.append("--servers") + + if 'min_duration' in kwargs: + tokens.append(f"--min-duration {kwargs.get('min_duration')}") + + if 'min_size' in kwargs: + tokens.append(f"--min-size {kwargs.get('min_size')}") + + if 'label_pattern' in kwargs: + tokens.append(f"--label {kwargs['label_pattern']}") + + if kwargs.get("nodes"): + tokens.append(f"--nodes {','.join(kwargs.get('nodes'))}") + + if 'limit' in kwargs: + tokens.append(f"--limit {kwargs['limit']}") + + if 'order' in kwargs: + tokens.append(f"--order {kwargs['order']}") + + if kwargs.get('kill'): + tokens.append("--kill --yes") + + return " ".join(tokens) + + @staticmethod + def __parse_tx_info(output): + tx_info_pattern = re.compile( + "Near XID version: (?P<xid_full>GridCacheVersion \\[topVer=\\d+, order=\\d+, nodeOrder=\\d+\\])\\n\\s+" + "Near XID version \\(UUID\\): (?P<xid>[^\\s]+)\\n\\s+" + "Isolation: (?P<isolation>[^\\s]+)\\n\\s+" + "Concurrency: (?P<concurrency>[^\\s]+)\\n\\s+" + "Timeout: (?P<timeout>\\d+)\\n\\s+" + "Initiator node: (?P<initiator_id>[^\\s]+)\\n\\s+" + "Initiator node \\(consistent ID\\): (?P<initiator_consistent_id>[^\\s+]+)\\n\\s+" + "Label: (?P<label>[^\\s]+)\\n\\s+Topology version: AffinityTopologyVersion " + "\\[topVer=(?P<top_ver>\\d+), minorTopVer=(?P<minor_top_ver>\\d+)\\]\\n\\s+" + "Used caches \\(ID to name\\): {(?P<caches>.*)}\\n\\s+" + "Used cache groups \\(ID to name\\): {(?P<cache_groups>.*)}\\n\\s+" + "States across the cluster: \\[(?P<states>.*)\\]" + ) + + match = tx_info_pattern.search(output) + + str_fields = ['xid', 'xid_full', 'label', 'timeout', 'isolation', 'concurrency', 'initiator_id', + 'initiator_consistent_id'] + dict_fields = ['caches', 'cache_groups'] + + if match: + kwargs = {v: match.group(v) for v in str_fields} + kwargs['timeout'] = int(match.group('timeout')) + kwargs.update({v: parse_dict(match.group(v)) for v in dict_fields}) + kwargs['top_ver'] = (int(match.group('top_ver')), int(match.group('minor_top_ver'))) + kwargs['states'] = parse_list(match.group('states')) + + return TxVerboseInfo(**kwargs) + + return None + + @staticmethod + def __parse_tx_list(output): + tx_pattern = re.compile( + "Tx: \\[xid=(?P<xid>[^\\s]+), " + "label=(?P<label>[^\\s]+), state=(?P<state>[^\\s]+), " + "startTime=(?P<start_time>\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}.\\d{3}), duration=(?P<duration>\\d+), " + "isolation=(?P<isolation>[^\\s]+), concurrency=(?P<concurrency>[^\\s]+), " + "topVer=AffinityTopologyVersion \\[topVer=(?P<top_ver>\\d+), minorTopVer=(?P<minor_top_ver>\\d+)\\], " + "timeout=(?P<timeout>\\d+), size=(?P<size>\\d+), dhtNodes=\\[(?P<dht_nodes>.*)\\], " + "nearXid=(?P<near_xid>[^\\s]+), parentNodeIds=\\[(?P<parent_nodes>.*)\\]\\]") + + str_fields = ['xid', 'label', 'state', 'isolation', 'concurrency', 'near_xid'] + int_fields = ['timeout', 'size', 'duration'] + list_fields = ['parent_nodes', 'dht_nodes'] + + tx_list = [] + for match in tx_pattern.finditer(output): + kwargs = {v: match.group(v) for v in str_fields} + kwargs.update({v: int(match.group(v)) for v in int_fields}) + kwargs['top_ver'] = (int(match.group('top_ver')), int(match.group('minor_top_ver'))) + kwargs.update({v: parse_list(match.group(v)) for v in list_fields}) + kwargs['start_time'] = time.strptime(match.group('start_time'), "%Y-%m-%d %H:%M:%S.%f") + tx_list.append(TxInfo(**kwargs)) + + return tx_list + @staticmethod def __parse_cluster_state(output): state_pattern = re.compile("Cluster state: (?P<cluster_state>[^\\s]+)") @@ -127,12 +247,12 @@ class ControlUtility: def __run(self, cmd): node = random.choice(self.__alives()) - self.logger.debug("Run command %s on node %s", cmd, node.name) + self.logger.debug(f"Run command {cmd} on node {node.name}") raw_output = node.account.ssh_capture(self.__form_cmd(node, cmd), allow_fail=True) code, output = self.__parse_output(raw_output) - self.logger.debug("Output of command %s on node %s, exited with code %d, is %s", cmd, node.name, code, output) + self.logger.debug(f"Output of command {cmd} on node {node.name}, exited with code {code}, is {output}") if code != 0: raise ControlUtilityError(node.account, cmd, code, output) @@ -140,8 +260,7 @@ class ControlUtility: return output def __form_cmd(self, node, cmd): - return self._cluster.spec.path.script("%s --host %s %s" % - (self.BASE_COMMAND, node.account.externally_routable_ip, cmd)) + return self._cluster.spec.path.script(f"{self.BASE_COMMAND} --host {node.account.externally_routable_ip} {cmd}") @staticmethod def __parse_output(raw_output): @@ -159,8 +278,59 @@ class ControlUtility: return [node for node in self._cluster.nodes if self._cluster.alive(node)] -BaselineNode = namedtuple("BaselineNode", ["consistent_id", "state", "order"]) -ClusterState = namedtuple("ClusterState", ["state", "topology_version", "baseline"]) +class BaselineNode(NamedTuple): + """ + Baseline node info. + """ + consistent_id: str + state: str + order: int + + +class ClusterState(NamedTuple): + """ + Cluster state info. + """ + state: str + topology_version: int + baseline: list + + +class TxInfo(NamedTuple): + """ + Transaction info. + """ + xid: str + near_xid: str + label: str + state: str + start_time: time.struct_time + duration: int + isolation: str + concurrency: str + top_ver: tuple + timeout: int + size: int + dht_nodes: list = [] + parent_nodes: list = [] + + +class TxVerboseInfo(NamedTuple): + """ + Transaction info returned with --info + """ + xid: str + xid_full: str + label: str + isolation: str + concurrency: str + timeout: int + top_ver: tuple + initiator_id: str + initiator_consistent_id: str + caches: dict + cache_groups: dict + states: list class ControlUtilityError(RemoteCommandError): @@ -169,3 +339,22 @@ class ControlUtilityError(RemoteCommandError): """ def __init__(self, account, cmd, exit_status, output): super().__init__(account, cmd, exit_status, "".join(output)) + + +def parse_dict(raw): + """ + Parse java Map.toString() to python dict. + """ + res = {} + for token in raw.split(','): + key, value = tuple(token.strip().split('=')) + res[key] = value + + return res + + +def parse_list(raw): + """ + Parse java List.toString() to python list + """ + return [token.strip() for token in raw.split(',')] diff --git a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml b/modules/ducktests/tests/ignitetest/tests/control_utility/__init__.py similarity index 77% copy from modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml copy to modules/ducktests/tests/ignitetest/tests/control_utility/__init__.py index 4811574..1540a34 100644 --- a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml +++ b/modules/ducktests/tests/ignitetest/tests/control_utility/__init__.py @@ -13,17 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -smoke: - - ../smoke_test.py - -control_utility: - - ../control_utility_test.py - -pme_free_switch: - - ../pme_free_switch_test.py - -cellular_affinity: - - ../cellular_affinity_test.py - -rebalance: - - ../add_node_rebalance_test.py +""" +This package contains control.sh utility tests. +""" diff --git a/modules/ducktests/tests/ignitetest/tests/control_utility_test.py b/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py similarity index 95% rename from modules/ducktests/tests/ignitetest/tests/control_utility_test.py rename to modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py index a30a381..a1de09f 100644 --- a/modules/ducktests/tests/ignitetest/tests/control_utility_test.py +++ b/modules/ducktests/tests/ignitetest/tests/control_utility/baseline_test.py @@ -14,7 +14,7 @@ # limitations under the License. """ -This module contains control.sh utility tests. +This module contains manipulating baseline test through control utility. """ from ducktape.mark.resource import cluster @@ -27,7 +27,7 @@ from ignitetest.services.utils.ignite_configuration.data_storage import DataRegi from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster from ignitetest.utils import version_if, ignite_versions from ignitetest.utils.ignite_test import IgniteTest -from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, IgniteVersion, LATEST_2_7, V_2_8_0 +from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, IgniteVersion, V_2_8_0 # pylint: disable=W0223 @@ -38,7 +38,7 @@ class BaselineTests(IgniteTest): NUM_NODES = 3 @cluster(num_nodes=NUM_NODES) - @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8), str(LATEST_2_7)) + @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8)) def test_baseline_set(self, ignite_version): """ Test baseline set. @@ -74,7 +74,7 @@ class BaselineTests(IgniteTest): self.__check_nodes_in_baseline(new_node.nodes, baseline) @cluster(num_nodes=NUM_NODES) - @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8), str(LATEST_2_7)) + @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8)) def test_baseline_add_remove(self, ignite_version): """ Test add and remove nodes from baseline. @@ -116,7 +116,7 @@ class BaselineTests(IgniteTest): self.__check_nodes_not_in_baseline(new_node.nodes, baseline) @cluster(num_nodes=NUM_NODES) - @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8), str(LATEST_2_7)) + @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8)) def test_activate_deactivate(self, ignite_version): """ Test activate and deactivate cluster. @@ -139,7 +139,7 @@ class BaselineTests(IgniteTest): @cluster(num_nodes=NUM_NODES) @version_if(lambda version: version >= V_2_8_0) - @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8), str(LATEST_2_7)) + @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8)) def test_baseline_autoadjust(self, ignite_version): """ Test activate and deactivate cluster. diff --git a/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py b/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py new file mode 100644 index 0000000..5aca06e --- /dev/null +++ b/modules/ducktests/tests/ignitetest/tests/control_utility/tx_test.py @@ -0,0 +1,192 @@ + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +This module contains transactions manipulation test through control utility. +""" +import random + +from ducktape.mark.resource import cluster + +from ignitetest.services.ignite import IgniteService +from ignitetest.services.ignite_app import IgniteApplicationService +from ignitetest.services.utils.control_utility import ControlUtility +from ignitetest.services.utils.ignite_configuration import IgniteConfiguration +from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration +from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster +from ignitetest.utils import ignite_versions +from ignitetest.utils.ignite_test import IgniteTest +from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, IgniteVersion + + +# pylint: disable=W0223 +class TransactionsTests(IgniteTest): + """ + Tests control.sh transaction management command. + """ + NUM_NODES = 4 + CACHE_NAME = "TEST" + + @cluster(num_nodes=NUM_NODES) + @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8)) + def test_tx_info(self, ignite_version): + """ + Tests verbose tx info for specific xid. + """ + servers = self.__start_ignite_nodes(ignite_version, self.NUM_NODES - 2) + + long_tx = self.__start_tx_app(ignite_version, servers, cache_name=self.CACHE_NAME, tx_count=2, tx_size=2, + key_prefix='TX_1_KEY_') + + wait_for_key_locked(long_tx) + + control_utility = ControlUtility(servers, self.test_context) + + transactions = control_utility.tx() + + pick_tx = random.choice(transactions) + + res = control_utility.tx_info(pick_tx.xid) + + assert res.xid == pick_tx.xid + assert res.timeout == pick_tx.timeout + assert res.top_ver == pick_tx.top_ver + assert res.label == pick_tx.label + + @cluster(num_nodes=NUM_NODES) + @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8)) + def test_kill_tx(self, ignite_version): + """ + Test kill transactions by xid and filter. + """ + servers = self.__start_ignite_nodes(ignite_version, self.NUM_NODES - 2) + + tx_count = 3 + + long_tx_1 = self.__start_tx_app(ignite_version, servers, cache_name=self.CACHE_NAME, tx_count=tx_count, + tx_size=2, key_prefix='TX_1_KEY_', label='TX_1', wait_for_topology_version=4) + + long_tx_2 = self.__start_tx_app(ignite_version, servers, cache_name=self.CACHE_NAME, tx_count=tx_count, + tx_size=2, key_prefix='TX_2_KEY_', label='TX_2', wait_for_topology_version=4) + + wait_for_key_locked(long_tx_1, long_tx_2) + + control_utility = ControlUtility(servers, self.test_context) + + # check kill with specific xid. + transactions = control_utility.tx(label_pattern='TX_1') + res = control_utility.tx_kill(xid=random.choice(transactions).xid) + assert res and len(res) == 1 and res[0].xid == long_tx_1.extract_result("TX_ID") + + # check kill with filter. + res = control_utility.tx_kill(label_pattern='TX_2') + assert res and len(res) == tx_count and set(map(lambda x: x.xid, res))\ + .issubset(set(long_tx_2.extract_results("TX_ID"))) + + @cluster(num_nodes=NUM_NODES) + @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8)) + def test_tx_filter(self, ignite_version): + """ + Test filtering transactions list. + """ + servers = self.__start_ignite_nodes(ignite_version, self.NUM_NODES - 2) + + client_tx_count, client_tx_size = 5, 4 + server_tx_count, server_tx_size = 3, 2 + + servers = self.__start_tx_app(ignite_version, servers, client_mode=False, cache_name=self.CACHE_NAME, + tx_count=server_tx_count, tx_size=server_tx_size, key_prefix='TX_1_KEY_', + label='LBL_SERVER', wait_for_topology_version=4) + + clients = self.__start_tx_app(ignite_version, servers, cache_name=self.CACHE_NAME, tx_count=client_tx_count, + tx_size=client_tx_size, key_prefix='TX_2_KEY_', label='LBL_CLIENT', + wait_for_topology_version=4) + + wait_for_key_locked(clients, servers) + control_utility = ControlUtility(servers, self.test_context) + + start_check = self.monotonic() + assert len(control_utility.tx(clients=True, label_pattern='LBL_.*')) == client_tx_count + assert len(control_utility.tx(servers=True, label_pattern='LBL_.*')) == server_tx_count + + # limit to 2 transactions on each node, therefore 4 total. + assert len(control_utility.tx(limit=2, label_pattern='LBL_.*')) == 4 + + assert len(control_utility.tx(label_pattern='LBL_.*')) == client_tx_count + server_tx_count + + # filter transactions with keys size greater or equal to min_size. + assert len(control_utility.tx(min_size=client_tx_size, label_pattern='LBL_.*')) == client_tx_count + + server_nodes = [node.consistent_id for node in servers.nodes] + assert len(control_utility.tx(label_pattern='LBL_.*', nodes=server_nodes)) == server_tx_count + + # test ordering. + for order_type in ['DURATION', 'SIZE', 'START_TIME']: + transactions = control_utility.tx(label_pattern='LBL_.*', order=order_type) + assert is_sorted(transactions, key=lambda x, attr=order_type: getattr(x, attr.lower()), reverse=True) + + # test min_duration filtering. + min_duration = int(self.monotonic() - start_check) + transactions = control_utility.tx(min_duration=min_duration, label_pattern='LBL_.*') + assert len(transactions) == server_tx_count + client_tx_count + for tx in transactions: + assert tx.duration >= min_duration + + def __start_tx_app(self, version, servers, *, client_mode=True, **kwargs): + app_params = { + 'config': IgniteConfiguration(version=IgniteVersion(version), + client_mode=client_mode, + discovery_spi=from_ignite_cluster(servers)), + 'java_class_name': 'org.apache.ignite.internal.ducktest.tests.control_utility' + '.LongRunningTransactionsGenerator', + 'params': kwargs + } + + app = IgniteApplicationService(self.test_context, **app_params) + app.start() + + return app + + def __start_ignite_nodes(self, version, num_nodes, timeout_sec=60): + config = IgniteConfiguration( + cluster_state="ACTIVE", + version=IgniteVersion(version), + caches=[CacheConfiguration(name=self.CACHE_NAME, atomicity_mode='TRANSACTIONAL')] + ) + + servers = IgniteService(self.test_context, config=config, num_nodes=num_nodes) + + servers.start(timeout_sec=timeout_sec) + + return servers + + +def wait_for_key_locked(*clusters): + """ + Wait for APPLICATION_KEYS_LOCKED on tx_app nodes. + """ + for cluster_ in clusters: + cluster_.await_event("APPLICATION_KEYS_LOCKED", timeout_sec=60, from_the_beginning=True) + + +def is_sorted(lst, key=lambda x: x, reverse=False): + """ + Check if list is sorted. + """ + for i, elem in enumerate(lst[1:]): + return key(elem) <= key(lst[i]) if not reverse else key(elem) >= key(lst[i]) + + return True diff --git a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml index 4811574..698c1d8 100644 --- a/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml +++ b/modules/ducktests/tests/ignitetest/tests/suites/fast_suite.yml @@ -17,7 +17,7 @@ smoke: - ../smoke_test.py control_utility: - - ../control_utility_test.py + - ../control_utility pme_free_switch: - ../pme_free_switch_test.py diff --git a/modules/ducktests/tests/setup.py b/modules/ducktests/tests/setup.py index 5b89d58..de849ed 100644 --- a/modules/ducktests/tests/setup.py +++ b/modules/ducktests/tests/setup.py @@ -30,7 +30,7 @@ setup(name="ignitetest", license="apache2.0", packages=find_packages(exclude=["ignitetest.tests", "ignitetest.tests.*"]), include_package_data=True, - install_requires=['ducktape==0.8.0'], + install_requires=["ducktape==0.8.0", "tox==3.15.2"], dependency_links=[ 'https://github.com/confluentinc/ducktape/tarball/master#egg=ducktape-0.8.0' ]) diff --git a/modules/ducktests/tests/tox.ini b/modules/ducktests/tests/tox.ini index 45107be..7955108 100644 --- a/modules/ducktests/tests/tox.ini +++ b/modules/ducktests/tests/tox.ini @@ -33,6 +33,7 @@ commands = [BASIC] min-public-methods=0 +good-names=i,j,k,x,y,ex,pk,tx [SIMILARITIES] ignore-imports=yes