This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch ignite-ducktape in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push: new 47d1311 IGNITE-13433 Benchmark confirms operation's latency drop decrease on Cellular switch comparing to PME-free switch (#8232) 47d1311 is described below commit 47d131124996cfc3f8a986a3b7406a364424bbf0 Author: Anton Vinogradov <a...@apache.org> AuthorDate: Tue Sep 15 10:19:59 2020 +0300 IGNITE-13433 Benchmark confirms operation's latency drop decrease on Cellular switch comparing to PME-free switch (#8232) --- .../CellularPreparedTxStreamer.java | 92 +++++++++++++ .../cellular_affinity_test/CellularTxStreamer.java | 112 +++++++++++++++ .../DistributionChecker.java | 4 +- .../SingleKeyTxStreamerApplication.java | 2 +- .../ducktest/utils/IgniteAwareApplication.java | 153 ++++++++++++++++----- modules/ducktests/tests/docker/run_tests.sh | 4 +- .../tests/ignitetest/services/ignite_app.py | 54 ++++++-- .../tests/ignitetest/services/utils/ignite_spec.py | 9 +- .../ignitetest/tests/add_node_rebalance_test.py | 6 - .../ignitetest/tests/cellular_affinity_test.py | 120 +++++++++++++++- .../tests/ignitetest/tests/pme_free_switch_test.py | 14 +- .../tests/ignitetest/utils/ignite_test.py | 7 - 12 files changed, 484 insertions(+), 93 deletions(-) diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java new file mode 100644 index 0000000..43424eb --- /dev/null +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests.cellular_affinity_test; + +import java.util.Collection; +import java.util.Map; +import java.util.stream.Collectors; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; +import org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl; +import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.transactions.Transaction; + +/** + * Prepares transactions at specified cell. + */ +public class CellularPreparedTxStreamer extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override protected void run(JsonNode jsonNode) throws Exception { + final String cacheName = jsonNode.get("cacheName").asText(); + final String attr = jsonNode.get("attr").asText(); + final String cell = jsonNode.get("cell").asText(); + final int txCnt = jsonNode.get("txCnt").asInt(); + + markInitialized(); + + waitForActivation(); + + IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheName); + + log.info("Starting Prepared Txs..."); + + Affinity<Integer> aff = ignite.affinity(cacheName); + + int cnt = 0; + int i = -1; // Negative keys to have no intersection with load. + + while (cnt != txCnt && !terminated()) { + Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(i); + + Map<Object, Long> stat = nodes.stream().collect( + Collectors.groupingBy(n -> n.attributes().get(attr), Collectors.counting())); + + assert 1 == stat.keySet().size() : + "Partition should be located on nodes from only one cell " + + "[key=" + i + ", nodes=" + nodes.size() + ", stat=" + stat + "]"; + + if (stat.containsKey(cell)) { + cnt++; + + Transaction tx = ignite.transactions().txStart(); + + cache.put(i, i); + + ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true); + + if (cnt % 100 == 0) + log.info("Long Tx prepared [key=" + i + ",cnt=" + cnt + ", cell=" + stat.keySet() + "]"); + } + + i--; + } + + log.info("All transactions prepared (" + cnt + ")"); + + while (!terminated()) { + log.info("Waiting for SIGTERM."); + + U.sleep(1000); + } + + markFinished(); + } +} diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularTxStreamer.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularTxStreamer.java new file mode 100644 index 0000000..ebfc6f2 --- /dev/null +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularTxStreamer.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.ducktest.tests.cellular_affinity_test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Map; +import java.util.stream.Collectors; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; + +/** + * Streams transactions to specified cell. + */ +public class CellularTxStreamer extends IgniteAwareApplication { + /** {@inheritDoc} */ + @Override public void run(JsonNode jsonNode) throws Exception { + String cacheName = jsonNode.get("cacheName").asText(); + int warmup = jsonNode.get("warmup").asInt(); + String cell = jsonNode.get("cell").asText(); + String attr = jsonNode.get("attr").asText(); + + markInitialized(); + + waitForActivation(); + + IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheName); + + long[] max = new long[20]; + + Arrays.fill(max, -1); + + int key = 0; + + int cnt = 0; + + long initTime = 0; + + boolean record = false; + + Affinity<Integer> aff = ignite.affinity(cacheName); + + while (!terminated()) { + key++; + + Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key); + + Map<Object, Long> stat = nodes.stream().collect( + Collectors.groupingBy(n -> n.attributes().get(attr), Collectors.counting())); + + if (!stat.containsKey(cell)) + continue; + + cnt++; + + long start = System.currentTimeMillis(); + + cache.put(key, key); + + long finish = System.currentTimeMillis(); + + long time = finish - start; + + if (!record && cnt > warmup) { + record = true; + + initTime = System.currentTimeMillis(); + + log.info("Warmup finished"); + } + + if (record) { + for (int i = 0; i < max.length; i++) { + if (max[i] <= time) { + System.arraycopy(max, i, max, i + 1, max.length - i - 1); + + max[i] = time; + + break; + } + } + } + + if (cnt % 1000 == 0) + log.info("Application streamed " + cnt + " transactions [worst_latency=" + Arrays.toString(max) + "]"); + } + + recordResult("WORST_LATENCY", Arrays.toString(max)); + recordResult("STREAMED", cnt - warmup); + recordResult("MEASURE_DURATION", System.currentTimeMillis() - initTime); + + markFinished(); + } +} diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/DistributionChecker.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/DistributionChecker.java index 2daf63e..22b2c94 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/DistributionChecker.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/DistributionChecker.java @@ -28,9 +28,7 @@ import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; * */ public class DistributionChecker extends IgniteAwareApplication { - /** - * {@inheritDoc} - */ + /** {@inheritDoc} */ @Override protected void run(JsonNode jsonNode) { String cacheName = jsonNode.get("cacheName").asText(); String attr = jsonNode.get("attr").asText(); diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java index 8c8be15..14b53a9 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java @@ -55,7 +55,7 @@ public class SingleKeyTxStreamerApplication extends IgniteAwareApplication { if (!record && cnt > warmup) { record = true; - initTime = System.currentTimeMillis();; + initTime = System.currentTimeMillis(); markInitialized(); } diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java index 107787b..a74b53f 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java @@ -17,9 +17,14 @@ package org.apache.ignite.internal.ducktest.utils; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; import com.fasterxml.jackson.databind.JsonNode; import org.apache.ignite.Ignite; +import org.apache.ignite.cluster.ClusterState; +import org.apache.ignite.internal.IgniteEx; import org.apache.ignite.internal.IgniteInterruptedCheckedException; +import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -55,8 +60,8 @@ public abstract class IgniteAwareApplication { /** Terminated. */ private static volatile boolean terminated; - /** Shutdown hook. */ - private static volatile Thread hook; + /** State mutex. */ + private static final Object stateMux = new Object(); /** Ignite. */ protected Ignite ignite; @@ -68,7 +73,7 @@ public abstract class IgniteAwareApplication { * Default constructor. */ protected IgniteAwareApplication() { - Runtime.getRuntime().addShutdownHook(hook = new Thread(() -> { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { log.info("SIGTERM recorded."); if (!finished && !broken) @@ -79,8 +84,13 @@ public abstract class IgniteAwareApplication { if (log.isDebugEnabled()) log.debug("Waiting for graceful termination..."); + int iter = 0; + while (!finished && !broken) { - log.info("Waiting for graceful termination cycle..."); + log.info("Waiting for graceful termination cycle... [iter=" + ++iter + "]"); + + if (iter == 100) + dumpThreads(); try { U.sleep(100); @@ -101,49 +111,72 @@ public abstract class IgniteAwareApplication { * Used to marks as started to perform actions. Suitable for async runs. */ protected void markInitialized() { - assert !inited; + log.info("Marking as initialized."); - log.info(APP_INITED); + synchronized (stateMux) { + assert !inited; + assert !finished; + assert !broken; - inited = true; + log.info(APP_INITED); + + inited = true; + } } /** * */ protected void markFinished() { - assert !finished; - assert !broken; + log.info("Marking as finished."); - log.info(APP_FINISHED); + synchronized (stateMux) { + assert inited; + assert !finished; + assert !broken; - if (!terminated()) - removeShutdownHook(); + log.info(APP_FINISHED); - finished = true; + finished = true; + } } /** * */ - private void markBroken() { - assert !finished; - assert !broken; + protected void markBroken(Throwable th) { + log.info("Marking as broken."); - log.info(APP_BROKEN); + synchronized (stateMux) { + if (broken) { + log.info("Already marked as broken."); - removeShutdownHook(); + return; + } - broken = true; + recordResult("ERROR", th.toString()); + + assert !finished; + + log.error(APP_BROKEN); + + broken = true; + } } /** * */ - private void removeShutdownHook() { - Runtime.getRuntime().removeShutdownHook(hook); + private void terminate() { + log.info("Marking as initialized."); + + synchronized (stateMux) { + assert !terminated; + + log.info(APP_TERMINATED); - log.info("Shutdown hook removed."); + terminated = true; + } } /** @@ -157,17 +190,6 @@ public abstract class IgniteAwareApplication { /** * */ - private void terminate() { - assert !terminated; - - log.info(APP_TERMINATED); - - terminated = true; - } - - /** - * - */ protected boolean terminated() { return terminated; } @@ -226,12 +248,71 @@ public abstract class IgniteAwareApplication { catch (Throwable th) { log.error("Unexpected Application failure... ", th); - recordResult("ERROR", th.getMessage()); - - markBroken(); + if (!broken) + markBroken(th); } finally { log.info("Application finished."); } } + + /** + * + */ + private static void dumpThreads() { + ThreadInfo[] infos = ManagementFactory.getThreadMXBean().dumpAllThreads(true, true); + + for (ThreadInfo info : infos) { + log.info(info.toString()); + + if ("main".equals(info.getThreadName())) { + StringBuilder sb = new StringBuilder(); + + sb.append("main\n"); + + for (StackTraceElement element : info.getStackTrace()) { + sb.append("\tat ").append(element.toString()); + sb.append('\n'); + } + + log.info(sb.toString()); + } + } + } + + /** + * + */ + protected void waitForActivation() throws IgniteInterruptedCheckedException { + boolean newApi = ignite.cluster().localNode().version().greaterThanEqual(2, 9, 0); + + while (newApi ? ignite.cluster().state() != ClusterState.ACTIVE : !ignite.cluster().active()) { + U.sleep(100); + + log.info("Waiting for cluster activation"); + } + + log.info("Cluster Activated"); + } + + /** + * + */ + protected void waitForRebalanced() throws IgniteInterruptedCheckedException { + boolean possible = ignite.cluster().localNode().version().greaterThanEqual(2, 8, 0); + + if (possible) { + GridCachePartitionExchangeManager<?, ?> mgr = ((IgniteEx)ignite).context().cache().context().exchange(); + + while (!mgr.lastFinishedFuture().rebalanced()) { + U.sleep(1000); + + log.info("Waiting for cluster rebalance finish"); + } + + log.info("Cluster Rebalanced"); + } + else + throw new UnsupportedOperationException("Operation supported since 2.8.0"); + } } diff --git a/modules/ducktests/tests/docker/run_tests.sh b/modules/ducktests/tests/docker/run_tests.sh index d8eb270..93a8b7c 100755 --- a/modules/ducktests/tests/docker/run_tests.sh +++ b/modules/ducktests/tests/docker/run_tests.sh @@ -21,7 +21,7 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" # DuckerUp parameters are specified with env variables # Num of cotainers that ducktape will prepare for tests -IGNITE_NUM_CONTAINERS=${IGNITE_NUM_CONTAINERS:-11} +IGNITE_NUM_CONTAINERS=${IGNITE_NUM_CONTAINERS:-13} # Image name to run nodes default_image_name="ducker-ignite-openjdk-8" @@ -55,7 +55,7 @@ The options are as follows: Display this help message. -n|--num-nodes - Specify how many nodes to start. Default number of nodes to start: 11. + Specify how many nodes to start. Default number of nodes to start: 13 (12 + 1 used by ducktape). -j|--max-parallel Specify max number of tests that can be run in parallel. diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py b/modules/ducktests/tests/ignitetest/services/ignite_app.py index b40d01a..9e396f2 100644 --- a/modules/ducktests/tests/ignitetest/services/ignite_app.py +++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py @@ -19,6 +19,9 @@ This module contains the base class to build Ignite aware application written on import re +# pylint: disable=W0622 +from ducktape.errors import TimeoutError + from ignitetest.services.utils.ignite_aware import IgniteAwareService @@ -38,7 +41,7 @@ class IgniteApplicationService(IgniteAwareService): self.servicejava_class_name = servicejava_class_name self.java_class_name = java_class_name self.timeout_sec = timeout_sec - self.stop_timeout_sec = 10 + self.params = params def start(self): super().start() @@ -46,25 +49,46 @@ class IgniteApplicationService(IgniteAwareService): self.logger.info("Waiting for Ignite aware Application (%s) to start..." % self.java_class_name) self.await_event("Topology snapshot", self.timeout_sec, from_the_beginning=True) - self.await_event("IGNITE_APPLICATION_INITIALIZED\\|IGNITE_APPLICATION_BROKEN", self.timeout_sec, - from_the_beginning=True) - try: - self.await_event("IGNITE_APPLICATION_INITIALIZED", 1, from_the_beginning=True) - except Exception: - raise Exception("Java application execution failed. %s" % self.extract_result("ERROR")) from None + self.__check_status("IGNITE_APPLICATION_INITIALIZED", timeout=self.timeout_sec) - # pylint: disable=W0221 - def stop_node(self, node, clean_shutdown=True, timeout_sec=20): - self.logger.info("%s Stopping node %s" % (self.__class__.__name__, str(node.account))) - node.account.kill_java_processes(self.servicejava_class_name, clean_shutdown=clean_shutdown, allow_fail=True) + def stop_async(self, clean_shutdown=True): + """ + Stops node in async way. + """ + self.logger.info("%s Stopping node %s" % (self.__class__.__name__, str(self.nodes[0].account))) + self.nodes[0].account.kill_java_processes(self.servicejava_class_name, clean_shutdown=clean_shutdown, + allow_fail=True) - stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec) + def await_stopped(self, timeout_sec=10): + """ + Awaits node stop finish. + """ + stopped = self.wait_node(self.nodes[0], timeout_sec=timeout_sec) assert stopped, "Node %s: did not stop within the specified timeout of %s seconds" % \ - (str(node.account), str(self.stop_timeout_sec)) + (str(self.nodes[0].account), str(timeout_sec)) - self.await_event("IGNITE_APPLICATION_FINISHED\\|IGNITE_APPLICATION_BROKEN", from_the_beginning=True, - timeout_sec=timeout_sec) + self.__check_status("IGNITE_APPLICATION_FINISHED", timeout=timeout_sec) + + # pylint: disable=W0221 + def stop_node(self, node, clean_shutdown=True, timeout_sec=10): + assert node == self.nodes[0] + self.stop_async(clean_shutdown) + self.await_stopped(timeout_sec) + + def __check_status(self, desired, timeout=1): + self.await_event("%s\\|IGNITE_APPLICATION_BROKEN" % desired, timeout, from_the_beginning=True) + + try: + self.await_event("IGNITE_APPLICATION_BROKEN", 1, from_the_beginning=True) + raise Exception("Java application execution failed. %s" % self.extract_result("ERROR")) + except TimeoutError: + pass + + try: + self.await_event(desired, 1, from_the_beginning=True) + except Exception: + raise Exception("Java application execution failed.") from None def clean_node(self, node): if self.alive(node): diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py index 124fefb..afb975a 100644 --- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py +++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py @@ -22,11 +22,10 @@ import importlib import json from abc import ABCMeta, abstractmethod -from ignitetest.services.utils.ignite_path import IgnitePath from ignitetest.services.utils.config_template import IgniteClientConfigTemplate, IgniteServerConfigTemplate -from ignitetest.utils.version import DEV_BRANCH - +from ignitetest.services.utils.ignite_path import IgnitePath from ignitetest.services.utils.ignite_persistence import IgnitePersistenceAware +from ignitetest.utils.version import DEV_BRANCH def resolve_spec(service, context, config, **kwargs): @@ -147,6 +146,8 @@ class ApacheIgniteNodeSpec(IgniteNodeSpec, IgnitePersistenceAware): libs.append("log4j") libs = list(map(lambda m: self.path.module(m) + "/*", libs)) + libs.append(IgnitePath(DEV_BRANCH).module("ducktests") + "/*") + self.envs = { 'EXCLUDE_TEST_CLASSES': 'true', 'IGNITE_LOG_DIR': self.PERSISTENT_ROOT, @@ -183,7 +184,7 @@ class ApacheIgniteApplicationSpec(IgniteApplicationSpec, IgnitePersistenceAware) } self.jvm_opts.extend([ - "-DIGNITE_SUCCESS_FILE=" + self.PERSISTENT_ROOT + "/success_file ", + "-DIGNITE_SUCCESS_FILE=" + self.PERSISTENT_ROOT + "/success_file", "-Dlog4j.configDebug=true", "-DIGNITE_NO_SHUTDOWN_HOOK=true", # allows to perform operations on app termination. "-Xmx1G", diff --git a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py b/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py index 0681424..82bc730 100644 --- a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py +++ b/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py @@ -47,15 +47,11 @@ class AddNodeRebalanceTest(IgniteTest): * Put data to it via IgniteClientApp. * Start one more node and awaits for rebalance to finish. """ - self.stage("Start Ignite nodes") - node_config = IgniteConfiguration(version=IgniteVersion(ignite_version)) ignites = IgniteService(self.test_context, config=node_config, num_nodes=self.NUM_NODES - 1) ignites.start() - self.stage("Starting DataGenerationApplication") - # This client just put some data to the cache. app_config = node_config._replace(client_mode=True, discovery_spi=from_ignite_cluster(ignites)) IgniteApplicationService(self.test_context, config=app_config, @@ -66,8 +62,6 @@ class AddNodeRebalanceTest(IgniteTest): ignite = IgniteService(self.test_context, node_config._replace(discovery_spi=from_ignite_cluster(ignites)), num_nodes=1) - self.stage("Starting Ignite node") - ignite.start() start = self.monotonic() diff --git a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py index c957e43..3c38b22 100644 --- a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py +++ b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py @@ -22,11 +22,12 @@ from jinja2 import Template from ignitetest.services.ignite import IgniteService from ignitetest.services.ignite_app import IgniteApplicationService +from ignitetest.services.utils.control_utility import ControlUtility from ignitetest.services.utils.ignite_configuration import IgniteConfiguration, IgniteClientConfiguration from ignitetest.services.utils.ignite_configuration.discovery import from_ignite_cluster from ignitetest.utils import ignite_versions, version_if from ignitetest.utils.ignite_test import IgniteTest -from ignitetest.utils.version import DEV_BRANCH, IgniteVersion +from ignitetest.utils.version import DEV_BRANCH, IgniteVersion, LATEST_2_8 # pylint: disable=W0223 @@ -40,6 +41,8 @@ class CellularAffinity(IgniteTest): CACHE_NAME = "test-cache" + PREPARED_TX_CNT = 500 # possible amount at real cluster under load (per cell). + CONFIG_TEMPLATE = """ <property name="cacheConfiguration"> <list> @@ -55,6 +58,7 @@ class CellularAffinity(IgniteTest): </property> <property name="name" value="{{ cacheName }}"/> <property name="backups" value="{{ nodes }}"/> + <property name="atomicityMode" value="TRANSACTIONAL"/> </bean> </list> </property> @@ -73,15 +77,17 @@ class CellularAffinity(IgniteTest): @cluster(num_nodes=NUM_NODES * 3 + 1) @version_if(lambda version: version >= DEV_BRANCH) @ignite_versions(str(DEV_BRANCH)) - def test(self, ignite_version): + def test_distribution(self, ignite_version): """ - Test Cellular Affinity scenario (partition distribution). + Tests Cellular Affinity scenario (partition distribution). """ cell1 = self.start_cell(ignite_version, ['-D' + CellularAffinity.ATTRIBUTE + '=1']) self.start_cell(ignite_version, ['-D' + CellularAffinity.ATTRIBUTE + '=2'], joined_cluster=cell1) self.start_cell(ignite_version, ['-D' + CellularAffinity.ATTRIBUTE + '=XXX', '-DRANDOM=42'], joined_cluster=cell1) + ControlUtility(cell1, self.test_context).activate() + checker = IgniteApplicationService( self.test_context, IgniteClientConfiguration(version=IgniteVersion(ignite_version), discovery_spi=from_ignite_cluster(cell1)), @@ -92,15 +98,117 @@ class CellularAffinity(IgniteTest): checker.run() - def start_cell(self, version, jvm_opts, joined_cluster=None): + # pylint: disable=R0914 + @cluster(num_nodes=NUM_NODES * (3 + 1)) + @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8)) + def test_latency(self, ignite_version): + """ + Tests Cellular switch tx latency. + """ + data = {} + + cell1, prepared_tx_loader1 = self.start_cell_with_prepared_txs(ignite_version, "C1") + _, prepared_tx_loader2 = self.start_cell_with_prepared_txs(ignite_version, "C2", joined_cluster=cell1) + _, prepared_tx_loader3 = self.start_cell_with_prepared_txs(ignite_version, "C3", joined_cluster=cell1) + + loaders = [prepared_tx_loader1, prepared_tx_loader2, prepared_tx_loader3] + + failed_loader = prepared_tx_loader3 + + tx_streamer1 = self.start_tx_streamer(ignite_version, "C1", joined_cluster=cell1) + tx_streamer2 = self.start_tx_streamer(ignite_version, "C2", joined_cluster=cell1) + tx_streamer3 = self.start_tx_streamer(ignite_version, "C3", joined_cluster=cell1) + + streamers = [tx_streamer1, tx_streamer2, tx_streamer3] + + for streamer in streamers: # starts tx streaming with latency record (with some warmup). + streamer.start() + + ControlUtility(cell1, self.test_context).disable_baseline_auto_adjust() # baseline set. + ControlUtility(cell1, self.test_context).activate() + + for loader in loaders: + loader.await_event("All transactions prepared", 180, from_the_beginning=True) + + for streamer in streamers: + streamer.await_event("Warmup finished", 180, from_the_beginning=True) + + failed_loader.stop_async() # node left with prepared txs. + + for streamer in streamers: + streamer.await_event("Node left topology\\|Node FAILED", 60, from_the_beginning=True) + + for streamer in streamers: # just an assertion that we have PME-free switch. + streamer.await_event("exchangeFreeSwitch=true", 60, from_the_beginning=True) + + for streamer in streamers: # waiting for streaming continuation. + streamer.await_event("Application streamed", 60) + + for streamer in streamers: # stops streaming and records results. + streamer.stop_async() + + for streamer in streamers: + streamer.await_stopped() + + cell = streamer.params["cell"] + + data["[%s cell %s]" % ("alive" if cell is not failed_loader.params["cell"] else "broken", cell)] = \ + "worst_latency=%s, tx_streamed=%s, measure_duration=%s" % ( + streamer.extract_result("WORST_LATENCY"), streamer.extract_result("STREAMED"), + streamer.extract_result("MEASURE_DURATION")) + + return data + + def start_tx_streamer(self, version, cell, joined_cluster): + """ + Starts transaction streamer. + """ + return IgniteApplicationService( + self.test_context, + IgniteClientConfiguration(version=IgniteVersion(version), properties=self.properties(), + discovery_spi=from_ignite_cluster(joined_cluster)), + java_class_name="org.apache.ignite.internal.ducktest.tests.cellular_affinity_test.CellularTxStreamer", + params={"cacheName": CellularAffinity.CACHE_NAME, + "attr": CellularAffinity.ATTRIBUTE, + "cell": cell, + "warmup": 10000}, + timeout_sec=180) + + def start_cell_with_prepared_txs(self, version, cell_id, joined_cluster=None): + """ + Starts cell with prepared transactions. + """ + nodes = self.start_cell(version, ['-D' + CellularAffinity.ATTRIBUTE + '=' + cell_id], + CellularAffinity.NUM_NODES - 1, joined_cluster) + + prepared_tx_streamer = IgniteApplicationService( # last server node at the cell. + self.test_context, + IgniteConfiguration(version=IgniteVersion(version), properties=self.properties(), + discovery_spi=from_ignite_cluster(nodes)), # Server node. + java_class_name= + "org.apache.ignite.internal.ducktest.tests.cellular_affinity_test.CellularPreparedTxStreamer", + params={"cacheName": CellularAffinity.CACHE_NAME, + "attr": CellularAffinity.ATTRIBUTE, + "cell": cell_id, + "txCnt": CellularAffinity.PREPARED_TX_CNT}, + jvm_opts=['-D' + CellularAffinity.ATTRIBUTE + '=' + cell_id], + timeout_sec=180) + + prepared_tx_streamer.start() # starts last server node and creates prepared txs on it. + + return nodes, prepared_tx_streamer + + def start_cell(self, version, jvm_opts, nodes_cnt=NUM_NODES, joined_cluster=None): """ Starts cell. """ - config = IgniteConfiguration(version=IgniteVersion(version), properties=self.properties()) + config = IgniteConfiguration(version=IgniteVersion(version), properties=self.properties(), + cluster_state="INACTIVE") + if joined_cluster: config = config._replace(discovery_spi=from_ignite_cluster(joined_cluster)) - ignites = IgniteService(self.test_context, config, num_nodes=CellularAffinity.NUM_NODES, jvm_opts=jvm_opts) + ignites = IgniteService(self.test_context, config, num_nodes=nodes_cnt, jvm_opts=jvm_opts) ignites.start() diff --git a/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py b/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py index 395b24b..3b54762 100644 --- a/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py +++ b/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py @@ -43,12 +43,10 @@ class PmeFreeSwitchTest(IgniteTest): @ignite_versions(str(DEV_BRANCH), str(LATEST_2_7)) def test(self, ignite_version): """ - Test PME free scenario (node stop). + Tests PME free scenario (node stop). """ data = {} - self.stage("Starting nodes") - config = IgniteConfiguration( version=IgniteVersion(ignite_version), caches=[CacheConfiguration(name='test-cache', backups=2, atomicity_mode='TRANSACTIONAL')] @@ -58,8 +56,6 @@ class PmeFreeSwitchTest(IgniteTest): ignites.start() - self.stage("Starting long_tx_streamer") - client_config = config._replace(client_mode=True, discovery_spi=from_ignite_cluster(ignites, slice(0, self.NUM_NODES - 1))) @@ -71,8 +67,6 @@ class PmeFreeSwitchTest(IgniteTest): long_tx_streamer.start() - self.stage("Starting single_key_tx_streamer") - single_key_tx_streamer = IgniteApplicationService( self.test_context, client_config, @@ -85,20 +79,14 @@ class PmeFreeSwitchTest(IgniteTest): if IgniteVersion(ignite_version) >= V_2_8_0: ControlUtility(ignites, self.test_context).disable_baseline_auto_adjust() - self.stage("Stopping server node") - ignites.stop_node(ignites.nodes[self.NUM_NODES - 1]) long_tx_streamer.await_event("Node left topology", 60, from_the_beginning=True) time.sleep(30) # keeping txs alive for 30 seconds. - self.stage("Stopping long_tx_streamer") - long_tx_streamer.stop() - self.stage("Stopping single_key_tx_streamer") - single_key_tx_streamer.stop() data["Worst latency (ms)"] = single_key_tx_streamer.extract_result("WORST_LATENCY") diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py b/modules/ducktests/tests/ignitetest/utils/ignite_test.py index a4bfe7d..feb5993 100644 --- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py +++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py @@ -29,13 +29,6 @@ class IgniteTest(Test): def __init__(self, test_context): super().__init__(test_context=test_context) - def stage(self, msg): - """ - Print stage mark. - :param msg: Stage mark message. - """ - self.logger.info("[TEST_STAGE] " + msg) - @staticmethod def monotonic(): """