This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new 793f056   Transaction to the discovery test.  (#8194)
793f056 is described below

commit 793f0569002c7cb00119bf8df641e8c0167a19cb
Author: Vladsz83 <vlads...@gmail.com>
AuthorDate: Fri Sep 4 12:08:11 2020 +0300

     Transaction to the discovery test.  (#8194)
---
 .../tests/ContinuousDataLoadApplication.java       | 136 ++++++++++++++++++---
 .../tests/ignitetest/tests/discovery_test.py       |  63 +++++++---
 2 files changed, 162 insertions(+), 37 deletions(-)

diff --git 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java
 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java
index 46a4b76..766ede6 100644
--- 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java
+++ 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/ContinuousDataLoadApplication.java
@@ -17,11 +17,20 @@
 
 package org.apache.ignite.internal.ducktest.tests;
 
-import java.util.Random;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import com.fasterxml.jackson.annotation.JsonAutoDetect;
+import com.fasterxml.jackson.annotation.PropertyAccessor;
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
 import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.transactions.Transaction;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -32,43 +41,130 @@ public class ContinuousDataLoadApplication extends 
IgniteAwareApplication {
     /** Logger. */
     private static final Logger log = 
LogManager.getLogger(ContinuousDataLoadApplication.class.getName());
 
-    /** {@inheritDoc} */
-    @Override protected void run(JsonNode jsonNode) {
-        String cacheName = jsonNode.get("cacheName").asText();
-        int range = jsonNode.get("range").asInt();
+    /** */
+    private IgniteCache<Integer, Integer> cache;
+
+    /** Node set to exclusively put data on if required. */
+    private List<ClusterNode> nodesToLoad = Collections.emptyList();
 
-        IgniteCache<Integer, Integer> cache = 
ignite.getOrCreateCache(cacheName);
+    /** */
+    private Affinity<Integer> aff;
 
-        int warmUpCnt = (int)Math.max(1, 0.1f * range);
+    /** Data number to put before notifying of the initialized state. */
+    private int warmUpCnt;
 
-        Random rnd = new Random();
+    /** {@inheritDoc} */
+    @Override protected void run(JsonNode jsonNode) {
+        Config cfg = parseConfig(jsonNode);
 
-        long streamed = 0;
+        init(cfg);
 
         log.info("Generating data in background...");
 
         long notifyTime = System.nanoTime();
 
+        int loaded = 0;
+
         while (active()) {
-            cache.put(rnd.nextInt(range), rnd.nextInt(range));
+            try (Transaction tx = cfg.transactional ? 
ignite.transactions().txStart() : null) {
+                for (int i = 0; i < cfg.range && active(); ++i) {
+                    if (skipDataKey(i))
+                        continue;
 
-            streamed++;
+                    cache.put(i, i);
 
-            if (notifyTime + U.millisToNanos(1500) < System.nanoTime()) {
-                notifyTime = System.nanoTime();
+                    ++loaded;
 
-                if (log.isDebugEnabled())
-                    log.debug("Streamed " + streamed + " entries.");
-            }
+                    if (notifyTime + U.millisToNanos(1500) < System.nanoTime())
+                        notifyTime = System.nanoTime();
 
-            // Delayed notify of the initialization to make sure the data load 
has completelly began and
-            // has produced some valuable amount of data.
-            if (!inited() && warmUpCnt == streamed)
-                markInitialized();
+                    // Delayed notify of the initialization to make sure the 
data load has completelly began and
+                    // has produced some valuable amount of data.
+                    if (!inited() && warmUpCnt == loaded)
+                        markInitialized();
+                }
+
+                if (tx != null && active())
+                    tx.commit();
+            }
         }
 
         log.info("Background data generation finished.");
 
         markFinished();
     }
+
+    /**
+     * @return {@code True} if data should not be put for {@code dataKey}. 
{@code False} otherwise.
+     */
+    private boolean skipDataKey(int dataKey) {
+        if (!nodesToLoad.isEmpty()) {
+            for (ClusterNode n : nodesToLoad) {
+                if (aff.isPrimary(n, dataKey))
+                    return false;
+            }
+
+            return true;
+        }
+
+        return false;
+    }
+
+    /**
+     * Prepares run settings based on {@code cfg}.
+     */
+    private void init(Config cfg) {
+        cache = ignite.getOrCreateCache(cfg.cacheName);
+
+        if (cfg.targetNodes != null && !cfg.targetNodes.isEmpty()) {
+            nodesToLoad = ignite.cluster().nodes().stream().filter(n -> 
cfg.targetNodes.contains(n.id().toString()))
+                .collect(Collectors.toList());
+
+            aff = ignite.affinity(cfg.cacheName);
+        }
+
+        warmUpCnt = cfg.warmUpRange < 1 ? (int)Math.max(1, 0.1f * cfg.range) : 
cfg.warmUpRange;
+    }
+
+    /**
+     * Converts Json-represented config into {@code Config}.
+     */
+    private static Config parseConfig(JsonNode node) {
+        ObjectMapper objMapper = new ObjectMapper();
+        objMapper.setVisibility(PropertyAccessor.FIELD, 
JsonAutoDetect.Visibility.ANY);
+
+        Config cfg;
+
+        try {
+            cfg = objMapper.treeToValue(node, Config.class);
+        }
+        catch (Exception e) {
+            throw new IllegalStateException("Unable to parse config.", e);
+        }
+
+        return cfg;
+    }
+
+    /**
+     * The configuration holder.
+     */
+    private static class Config {
+        /** Name of the cache. */
+        private String cacheName;
+
+        /** Data/keys number to load. */
+        private int range;
+
+        /** Node id set. If not empty, data will be load only on this nodes. */
+        private Set<String> targetNodes;
+
+        /** If {@code true}, data will be put within transaction. */
+        private boolean transactional;
+
+        /**
+         * Data number to warn-up and to delay the init-notification. If < 1, 
ignored and considered default 10% of
+         * {@code range}.
+         */
+        private int warmUpRange;
+    }
 }
diff --git a/modules/ducktests/tests/ignitetest/tests/discovery_test.py 
b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
index 6daaa2d..9b28e97 100644
--- a/modules/ducktests/tests/ignitetest/tests/discovery_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/discovery_test.py
@@ -19,6 +19,7 @@ Module contains discovery tests.
 
 import random
 import re
+from enum import IntEnum
 from datetime import datetime
 from time import monotonic
 from typing import NamedTuple
@@ -30,6 +31,7 @@ from ignitetest.services.ignite import IgniteAwareService
 from ignitetest.services.ignite import IgniteService
 from ignitetest.services.ignite_app import IgniteApplicationService
 from ignitetest.services.utils.ignite_configuration import IgniteConfiguration
+from ignitetest.services.utils.ignite_configuration.cache import 
CacheConfiguration
 from ignitetest.services.utils.ignite_configuration.discovery import 
from_zookeeper_cluster, from_ignite_cluster, \
     TcpDiscoverySpi
 from ignitetest.services.utils.time_utils import epoch_mills
@@ -38,6 +40,15 @@ from ignitetest.utils.ignite_test import IgniteTest
 from ignitetest.utils.version import DEV_BRANCH, LATEST_2_8, IgniteVersion
 
 
+class ClusterLoad(IntEnum):
+    """
+    Type of cluster loading.
+    """
+    none = 0
+    atomic = 1
+    transactional = 2
+
+
 class DiscoveryTestConfig(NamedTuple):
     """
     Configuration for DiscoveryTest.
@@ -45,7 +56,7 @@ class DiscoveryTestConfig(NamedTuple):
     version: IgniteVersion
     nodes_to_kill: int = 1
     kill_coordinator: bool = False
-    with_load: bool = False
+    load_type: ClusterLoad = ClusterLoad.none
     with_zk: bool = False
 
 
@@ -61,19 +72,22 @@ class DiscoveryTest(IgniteTest):
 
     FAILURE_DETECTION_TIMEOUT = 2000
 
-    DATA_AMOUNT = 100000
+    DATA_AMOUNT = 5_000_000
+
+    WARMUP_DATA_AMOUNT = 10_000
 
     @cluster(num_nodes=NUM_NODES)
     @matrix(version=[str(DEV_BRANCH), str(LATEST_2_8)],
             kill_coordinator=[False, True],
             nodes_to_kill=[1, 2],
-            with_load=[False, True])
-    def test_node_fail_tcp(self, version, kill_coordinator, nodes_to_kill, 
with_load):
+            load_type=[ClusterLoad.none, ClusterLoad.atomic, 
ClusterLoad.transactional])
+    def test_node_fail_tcp(self, version, kill_coordinator, nodes_to_kill, 
load_type):
         """
         Test nodes failure scenario with TcpDiscoverySpi.
+        :param load_type: How to load cluster during the test: 0 - no loading; 
1 - do some loading; 2 - transactional.
         """
         test_config = DiscoveryTestConfig(version=IgniteVersion(version), 
kill_coordinator=kill_coordinator,
-                                          nodes_to_kill=nodes_to_kill, 
with_load=with_load, with_zk=False)
+                                          nodes_to_kill=nodes_to_kill, 
load_type=load_type, with_zk=False)
 
         return self._perform_node_fail_scenario(test_config)
 
@@ -81,13 +95,14 @@ class DiscoveryTest(IgniteTest):
     @matrix(version=[str(DEV_BRANCH), str(LATEST_2_8)],
             kill_coordinator=[False, True],
             nodes_to_kill=[1, 2],
-            with_load=[False, True])
-    def test_node_fail_zk(self, version, kill_coordinator, nodes_to_kill, 
with_load):
+            load_type=[ClusterLoad.none, ClusterLoad.atomic, 
ClusterLoad.transactional])
+    def test_node_fail_zk(self, version, kill_coordinator, nodes_to_kill, 
load_type):
         """
         Test node failure scenario with ZooKeeperSpi.
+        :param load_type: How to load cluster during the test: 0 - no loading; 
1 - do some loading; 2 - transactional.
         """
         test_config = DiscoveryTestConfig(version=IgniteVersion(version), 
kill_coordinator=kill_coordinator,
-                                          nodes_to_kill=nodes_to_kill, 
with_load=with_load, with_zk=True)
+                                          nodes_to_kill=nodes_to_kill, 
load_type=load_type, with_zk=True)
 
         return self._perform_node_fail_scenario(test_config)
 
@@ -104,19 +119,35 @@ class DiscoveryTest(IgniteTest):
         ignite_config = IgniteConfiguration(
             version=test_config.version,
             discovery_spi=discovery_spi,
-            failure_detection_timeout=self.FAILURE_DETECTION_TIMEOUT
+            failure_detection_timeout=self.FAILURE_DETECTION_TIMEOUT,
+            caches=[CacheConfiguration(name='test-cache', backups=1, 
atomicity_mode='TRANSACTIONAL' if
+            test_config.load_type == ClusterLoad.transactional else 'ATOMIC')]
         )
 
         servers, start_servers_sec = start_servers(self.test_context, 
self.NUM_NODES - 1, ignite_config, modules)
 
-        if test_config.with_load:
+        failed_nodes, survived_node = choose_node_to_kill(servers, 
test_config.kill_coordinator,
+                                                          
test_config.nodes_to_kill)
+
+        if test_config.load_type is not ClusterLoad.none:
             load_config = ignite_config._replace(client_mode=True) if 
test_config.with_zk else \
                 ignite_config._replace(client_mode=True, 
discovery_spi=from_ignite_cluster(servers))
 
-            start_load_app(self.test_context, ignite_config=load_config, 
data_amount=self.DATA_AMOUNT, modules=modules)
+            tran_nodes = [n.discovery_info().node_id for n in failed_nodes] \
+                if test_config.load_type == ClusterLoad.transactional else None
+
+            params = {"cacheName": "test-cache",
+                      "range": self.DATA_AMOUNT,
+                      "warmUpRange": self.WARMUP_DATA_AMOUNT,
+                      "targetNodes": tran_nodes,
+                      "transactional": bool(tran_nodes)}
+
+            start_load_app(self.test_context, ignite_config=load_config, 
params=params, modules=modules)
+
+        data = simulate_nodes_failure(servers, failed_nodes, survived_node)
 
-        data = simulate_nodes_failure(servers, test_config.kill_coordinator, 
test_config.nodes_to_kill)
         data['Ignite cluster start time (s)'] = start_servers_sec
+
         return data
 
 
@@ -142,7 +173,7 @@ def start_servers(test_context, num_nodes, ignite_config, 
modules=None):
     return servers, round(monotonic() - start, 1)
 
 
-def start_load_app(test_context, ignite_config, data_amount, modules=None):
+def start_load_app(test_context, ignite_config, params, modules=None):
     """
     Start loader application.
     """
@@ -153,7 +184,7 @@ def start_load_app(test_context, ignite_config, 
data_amount, modules=None):
         modules=modules,
         # mute spam in log.
         jvm_opts=["-DIGNITE_DUMP_THREADS_ON_FAILURE=false"],
-        params={"cacheName": "test-cache", "range": data_amount})
+        params=params)
 
     loader.start()
 
@@ -186,12 +217,10 @@ def choose_node_to_kill(servers, kill_coordinator, 
nodes_to_kill):
     return to_kill, survive
 
 
-def simulate_nodes_failure(servers, kill_coordinator, nodes_to_kill):
+def simulate_nodes_failure(servers, failed_nodes, survived_node):
     """
     Perform node failure scenario
     """
-    failed_nodes, survived_node = choose_node_to_kill(servers, 
kill_coordinator, nodes_to_kill)
-
     ids_to_wait = [node.discovery_info().node_id for node in failed_nodes]
 
     _, first_terminated = servers.stop_nodes_async(failed_nodes, 
clean_shutdown=False, wait_for_stop=False)

Reply via email to