This is an automated email from the ASF dual-hosted git repository. av pushed a commit to branch ignite-ducktape in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/ignite-ducktape by this push: new 793f056 Transaction to the discovery test. (#8194) 793f056 is described below commit 793f0569002c7cb00119bf8df641e8c0167a19cb Author: Vladsz83 <vlads...@gmail.com> AuthorDate: Fri Sep 4 12:08:11 2020 +0300 Transaction to the discovery test. (#8194) --- .../tests/ContinuousDataLoadApplication.java | 136 ++++++++++++++++++--- .../tests/ignitetest/tests/discovery_test.py | 63 +++++++--- 2 files changed, 162 insertions(+), 37 deletions(-) diff --git a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java index 46a4b76..766ede6 100644 --- a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java +++ b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java @@ -17,11 +17,20 @@ package org.apache.ignite.internal.ducktest.tests; -import java.util.Random; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.ignite.IgniteCache; +import org.apache.ignite.cache.affinity.Affinity; +import org.apache.ignite.cluster.ClusterNode; import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication; import org.apache.ignite.internal.util.typedef.internal.U; +import org.apache.ignite.transactions.Transaction; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -32,43 +41,130 @@ public class ContinuousDataLoadApplication extends IgniteAwareApplication { /** Logger. */ private static final Logger log = LogManager.getLogger(ContinuousDataLoadApplication.class.getName()); - /** {@inheritDoc} */ - @Override protected void run(JsonNode jsonNode) { - String cacheName = jsonNode.get("cacheName").asText(); - int range = jsonNode.get("range").asInt(); + /** */ + private IgniteCache<Integer, Integer> cache; + + /** Node set to exclusively put data on if required. */ + private List<ClusterNode> nodesToLoad = Collections.emptyList(); - IgniteCache<Integer, Integer> cache = ignite.getOrCreateCache(cacheName); + /** */ + private Affinity<Integer> aff; - int warmUpCnt = (int)Math.max(1, 0.1f * range); + /** Data number to put before notifying of the initialized state. */ + private int warmUpCnt; - Random rnd = new Random(); + /** {@inheritDoc} */ + @Override protected void run(JsonNode jsonNode) { + Config cfg = parseConfig(jsonNode); - long streamed = 0; + init(cfg); log.info("Generating data in background..."); long notifyTime = System.nanoTime(); + int loaded = 0; + while (active()) { - cache.put(rnd.nextInt(range), rnd.nextInt(range)); + try (Transaction tx = cfg.transactional ? ignite.transactions().txStart() : null) { + for (int i = 0; i < cfg.range && active(); ++i) { + if (skipDataKey(i)) + continue; - streamed++; + cache.put(i, i); - if (notifyTime + U.millisToNanos(1500) < System.nanoTime()) { - notifyTime = System.nanoTime(); + ++loaded; - if (log.isDebugEnabled()) - log.debug("Streamed " + streamed + " entries."); - } + if (notifyTime + U.millisToNanos(1500) < System.nanoTime()) + notifyTime = System.nanoTime(); - // Delayed notify of the initialization to make sure the data load has completelly began and - // has produced some valuable amount of data. - if (!inited() && warmUpCnt == streamed) - markInitialized(); + // Delayed notify of the initialization to make sure the data load has completelly began and + // has produced some valuable amount of data. + if (!inited() && warmUpCnt == loaded) + markInitialized(); + } + + if (tx != null && active()) + tx.commit(); + } } log.info("Background data generation finished."); markFinished(); } + + /** + * @return {@code True} if data should not be put for {@code dataKey}. {@code False} otherwise. + */ + private boolean skipDataKey(int dataKey) { + if (!nodesToLoad.isEmpty()) { + for (ClusterNode n : nodesToLoad) { + if (aff.isPrimary(n, dataKey)) + return false; + } + + return true; + } + + return false; + } + + /** + * Prepares run settings based on {@code cfg}. + */ + private void init(Config cfg) { + cache = ignite.getOrCreateCache(cfg.cacheName); + + if (cfg.targetNodes != null && !cfg.targetNodes.isEmpty()) { + nodesToLoad = ignite.cluster().nodes().stream().filter(n -> cfg.targetNodes.contains(n.id().toString())) + .collect(Collectors.toList()); + + aff = ignite.affinity(cfg.cacheName); + } + + warmUpCnt = cfg.warmUpRange < 1 ? (int)Math.max(1, 0.1f * cfg.range) : cfg.warmUpRange; + } + + /** + * Converts Json-represented config into {@code Config}. + */ + private static Config parseConfig(JsonNode node) { + ObjectMapper objMapper = new ObjectMapper(); + objMapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + + Config cfg; + + try { + cfg = objMapper.treeToValue(node, Config.class); + } + catch (Exception e) { + throw new IllegalStateException("Unable to parse config.", e); + } + + return cfg; + } + + /** + * The configuration holder. + */ + private static class Config { + /** Name of the cache. */ + private String cacheName; + + /** Data/keys number to load. */ + private int range; + + /** Node id set. If not empty, data will be load only on this nodes. */ + private Set<String> targetNodes; + + /** If {@code true}, data will be put within transaction. */ + private boolean transactional; + + /** + * Data number to warn-up and to delay the init-notification. If < 1, ignored and considered default 10% of + * {@code range}. + */ + private int warmUpRange; + } } diff --git a/modules/ducktests/tests/ignitetest/tests/discovery_test.py b/modules/ducktests/tests/ignitetest/tests/discovery_test.py index 6daaa2d..9b28e97 100644 --- a/modules/ducktests/tests/ignitetest/tests/discovery_test.py +++ b/modules/ducktests/tests/ignitetest/tests/discovery_test.py @@ -19,6 +19,7 @@ Module contains discovery tests. import random import re +from enum import IntEnum from datetime import datetime from time import monotonic from typing import NamedTuple @@ -30,6 +31,7 @@ from ignitetest.services.ignite import IgniteAwareService from ignitetest.services.ignite import IgniteService from ignitetest.services.ignite_app import IgniteApplicationService from ignitetest.services.utils.ignite_configuration import IgniteConfiguration +from ignitetest.services.utils.ignite_configuration.cache import CacheConfiguration from ignitetest.services.utils.ignite_configuration.discovery import from_zookeeper_cluster, from_ignite_cluster, \ TcpDiscoverySpi from ignitetest.services.utils.time_utils import epoch_mills @@ -38,6 +40,15 @@ from ignitetest.utils.ignite_test import IgniteTest from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, IgniteVersion +class ClusterLoad(IntEnum): + """ + Type of cluster loading. + """ + none = 0 + atomic = 1 + transactional = 2 + + class DiscoveryTestConfig(NamedTuple): """ Configuration for DiscoveryTest. @@ -45,7 +56,7 @@ class DiscoveryTestConfig(NamedTuple): version: IgniteVersion nodes_to_kill: int = 1 kill_coordinator: bool = False - with_load: bool = False + load_type: ClusterLoad = ClusterLoad.none with_zk: bool = False @@ -61,19 +72,22 @@ class DiscoveryTest(IgniteTest): FAILURE_DETECTION_TIMEOUT = 2000 - DATA_AMOUNT = 100000 + DATA_AMOUNT = 5_000_000 + + WARMUP_DATA_AMOUNT = 10_000 @cluster(num_nodes=NUM_NODES) @matrix(version=[str(DEV_BRANCH), str(LATEST_2_8)], kill_coordinator=[False, True], nodes_to_kill=[1, 2], - with_load=[False, True]) - def test_node_fail_tcp(self, version, kill_coordinator, nodes_to_kill, with_load): + load_type=[ClusterLoad.none, ClusterLoad.atomic, ClusterLoad.transactional]) + def test_node_fail_tcp(self, version, kill_coordinator, nodes_to_kill, load_type): """ Test nodes failure scenario with TcpDiscoverySpi. + :param load_type: How to load cluster during the test: 0 - no loading; 1 - do some loading; 2 - transactional. """ test_config = DiscoveryTestConfig(version=IgniteVersion(version), kill_coordinator=kill_coordinator, - nodes_to_kill=nodes_to_kill, with_load=with_load, with_zk=False) + nodes_to_kill=nodes_to_kill, load_type=load_type, with_zk=False) return self._perform_node_fail_scenario(test_config) @@ -81,13 +95,14 @@ class DiscoveryTest(IgniteTest): @matrix(version=[str(DEV_BRANCH), str(LATEST_2_8)], kill_coordinator=[False, True], nodes_to_kill=[1, 2], - with_load=[False, True]) - def test_node_fail_zk(self, version, kill_coordinator, nodes_to_kill, with_load): + load_type=[ClusterLoad.none, ClusterLoad.atomic, ClusterLoad.transactional]) + def test_node_fail_zk(self, version, kill_coordinator, nodes_to_kill, load_type): """ Test node failure scenario with ZooKeeperSpi. + :param load_type: How to load cluster during the test: 0 - no loading; 1 - do some loading; 2 - transactional. """ test_config = DiscoveryTestConfig(version=IgniteVersion(version), kill_coordinator=kill_coordinator, - nodes_to_kill=nodes_to_kill, with_load=with_load, with_zk=True) + nodes_to_kill=nodes_to_kill, load_type=load_type, with_zk=True) return self._perform_node_fail_scenario(test_config) @@ -104,19 +119,35 @@ class DiscoveryTest(IgniteTest): ignite_config = IgniteConfiguration( version=test_config.version, discovery_spi=discovery_spi, - failure_detection_timeout=self.FAILURE_DETECTION_TIMEOUT + failure_detection_timeout=self.FAILURE_DETECTION_TIMEOUT, + caches=[CacheConfiguration(name='test-cache', backups=1, atomicity_mode='TRANSACTIONAL' if + test_config.load_type == ClusterLoad.transactional else 'ATOMIC')] ) servers, start_servers_sec = start_servers(self.test_context, self.NUM_NODES - 1, ignite_config, modules) - if test_config.with_load: + failed_nodes, survived_node = choose_node_to_kill(servers, test_config.kill_coordinator, + test_config.nodes_to_kill) + + if test_config.load_type is not ClusterLoad.none: load_config = ignite_config._replace(client_mode=True) if test_config.with_zk else \ ignite_config._replace(client_mode=True, discovery_spi=from_ignite_cluster(servers)) - start_load_app(self.test_context, ignite_config=load_config, data_amount=self.DATA_AMOUNT, modules=modules) + tran_nodes = [n.discovery_info().node_id for n in failed_nodes] \ + if test_config.load_type == ClusterLoad.transactional else None + + params = {"cacheName": "test-cache", + "range": self.DATA_AMOUNT, + "warmUpRange": self.WARMUP_DATA_AMOUNT, + "targetNodes": tran_nodes, + "transactional": bool(tran_nodes)} + + start_load_app(self.test_context, ignite_config=load_config, params=params, modules=modules) + + data = simulate_nodes_failure(servers, failed_nodes, survived_node) - data = simulate_nodes_failure(servers, test_config.kill_coordinator, test_config.nodes_to_kill) data['Ignite cluster start time (s)'] = start_servers_sec + return data @@ -142,7 +173,7 @@ def start_servers(test_context, num_nodes, ignite_config, modules=None): return servers, round(monotonic() - start, 1) -def start_load_app(test_context, ignite_config, data_amount, modules=None): +def start_load_app(test_context, ignite_config, params, modules=None): """ Start loader application. """ @@ -153,7 +184,7 @@ def start_load_app(test_context, ignite_config, data_amount, modules=None): modules=modules, # mute spam in log. jvm_opts=["-DIGNITE_DUMP_THREADS_ON_FAILURE=false"], - params={"cacheName": "test-cache", "range": data_amount}) + params=params) loader.start() @@ -186,12 +217,10 @@ def choose_node_to_kill(servers, kill_coordinator, nodes_to_kill): return to_kill, survive -def simulate_nodes_failure(servers, kill_coordinator, nodes_to_kill): +def simulate_nodes_failure(servers, failed_nodes, survived_node): """ Perform node failure scenario """ - failed_nodes, survived_node = choose_node_to_kill(servers, kill_coordinator, nodes_to_kill) - ids_to_wait = [node.discovery_info().node_id for node in failed_nodes] _, first_terminated = servers.stop_nodes_async(failed_nodes, clean_shutdown=False, wait_for_stop=False)