This is an automated email from the ASF dual-hosted git repository.

av pushed a commit to branch ignite-ducktape
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/ignite-ducktape by this push:
     new 47d1311  IGNITE-13433 Benchmark confirms operation's latency drop 
decrease on Cellular switch comparing to PME-free switch (#8232)
47d1311 is described below

commit 47d131124996cfc3f8a986a3b7406a364424bbf0
Author: Anton Vinogradov <a...@apache.org>
AuthorDate: Tue Sep 15 10:19:59 2020 +0300

    IGNITE-13433 Benchmark confirms operation's latency drop decrease on 
Cellular switch comparing to PME-free switch (#8232)
---
 .../CellularPreparedTxStreamer.java                |  92 +++++++++++++
 .../cellular_affinity_test/CellularTxStreamer.java | 112 +++++++++++++++
 .../DistributionChecker.java                       |   4 +-
 .../SingleKeyTxStreamerApplication.java            |   2 +-
 .../ducktest/utils/IgniteAwareApplication.java     | 153 ++++++++++++++++-----
 modules/ducktests/tests/docker/run_tests.sh        |   4 +-
 .../tests/ignitetest/services/ignite_app.py        |  54 ++++++--
 .../tests/ignitetest/services/utils/ignite_spec.py |   9 +-
 .../ignitetest/tests/add_node_rebalance_test.py    |   6 -
 .../ignitetest/tests/cellular_affinity_test.py     | 120 +++++++++++++++-
 .../tests/ignitetest/tests/pme_free_switch_test.py |  14 +-
 .../tests/ignitetest/utils/ignite_test.py          |   7 -
 12 files changed, 484 insertions(+), 93 deletions(-)

diff --git 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java
 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java
new file mode 100644
index 0000000..43424eb
--- /dev/null
+++ 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularPreparedTxStreamer.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.cellular_affinity_test;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+import 
org.apache.ignite.internal.processors.cache.transactions.TransactionProxyImpl;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.transactions.Transaction;
+
+/**
+ *  Prepares transactions at specified cell.
+ */
+public class CellularPreparedTxStreamer extends IgniteAwareApplication {
+    /** {@inheritDoc} */
+    @Override protected void run(JsonNode jsonNode) throws Exception {
+        final String cacheName = jsonNode.get("cacheName").asText();
+        final String attr = jsonNode.get("attr").asText();
+        final String cell = jsonNode.get("cell").asText();
+        final int txCnt = jsonNode.get("txCnt").asInt();
+
+        markInitialized();
+
+        waitForActivation();
+
+        IgniteCache<Integer, Integer> cache = 
ignite.getOrCreateCache(cacheName);
+
+        log.info("Starting Prepared Txs...");
+
+        Affinity<Integer> aff = ignite.affinity(cacheName);
+
+        int cnt = 0;
+        int i = -1; // Negative keys to have no intersection with load.
+
+        while (cnt != txCnt && !terminated()) {
+            Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(i);
+
+            Map<Object, Long> stat = nodes.stream().collect(
+                Collectors.groupingBy(n -> n.attributes().get(attr), 
Collectors.counting()));
+
+            assert 1 == stat.keySet().size() :
+                "Partition should be located on nodes from only one cell " +
+                    "[key=" + i + ", nodes=" + nodes.size() + ", stat=" + stat 
+ "]";
+
+            if (stat.containsKey(cell)) {
+                cnt++;
+
+                Transaction tx = ignite.transactions().txStart();
+
+                cache.put(i, i);
+
+                ((TransactionProxyImpl<?, ?>)tx).tx().prepare(true);
+
+                if (cnt % 100 == 0)
+                    log.info("Long Tx prepared [key=" + i + ",cnt=" + cnt + ", 
cell=" + stat.keySet() + "]");
+            }
+
+            i--;
+        }
+
+        log.info("All transactions prepared (" + cnt + ")");
+
+        while (!terminated()) {
+            log.info("Waiting for SIGTERM.");
+
+            U.sleep(1000);
+        }
+
+        markFinished();
+    }
+}
diff --git 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularTxStreamer.java
 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularTxStreamer.java
new file mode 100644
index 0000000..ebfc6f2
--- /dev/null
+++ 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/CellularTxStreamer.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.ducktest.tests.cellular_affinity_test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
+
+/**
+ * Streams transactions to specified cell.
+ */
+public class CellularTxStreamer extends IgniteAwareApplication {
+    /** {@inheritDoc} */
+    @Override public void run(JsonNode jsonNode) throws Exception {
+        String cacheName = jsonNode.get("cacheName").asText();
+        int warmup = jsonNode.get("warmup").asInt();
+        String cell = jsonNode.get("cell").asText();
+        String attr = jsonNode.get("attr").asText();
+
+        markInitialized();
+
+        waitForActivation();
+
+        IgniteCache<Integer, Integer> cache = 
ignite.getOrCreateCache(cacheName);
+
+        long[] max = new long[20];
+
+        Arrays.fill(max, -1);
+
+        int key = 0;
+
+        int cnt = 0;
+
+        long initTime = 0;
+
+        boolean record = false;
+
+        Affinity<Integer> aff = ignite.affinity(cacheName);
+
+        while (!terminated()) {
+            key++;
+
+            Collection<ClusterNode> nodes = aff.mapKeyToPrimaryAndBackups(key);
+
+            Map<Object, Long> stat = nodes.stream().collect(
+                Collectors.groupingBy(n -> n.attributes().get(attr), 
Collectors.counting()));
+
+            if (!stat.containsKey(cell))
+                continue;
+
+            cnt++;
+
+            long start = System.currentTimeMillis();
+
+            cache.put(key, key);
+
+            long finish = System.currentTimeMillis();
+
+            long time = finish - start;
+
+            if (!record && cnt > warmup) {
+                record = true;
+
+                initTime = System.currentTimeMillis();
+
+                log.info("Warmup finished");
+            }
+
+            if (record) {
+                for (int i = 0; i < max.length; i++) {
+                    if (max[i] <= time) {
+                        System.arraycopy(max, i, max, i + 1, max.length - i - 
1);
+
+                        max[i] = time;
+
+                        break;
+                    }
+                }
+            }
+
+            if (cnt % 1000 == 0)
+                log.info("Application streamed " + cnt + " transactions 
[worst_latency=" + Arrays.toString(max) + "]");
+        }
+
+        recordResult("WORST_LATENCY", Arrays.toString(max));
+        recordResult("STREAMED", cnt - warmup);
+        recordResult("MEASURE_DURATION", System.currentTimeMillis() - 
initTime);
+
+        markFinished();
+    }
+}
diff --git 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/DistributionChecker.java
 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/DistributionChecker.java
index 2daf63e..22b2c94 100644
--- 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/DistributionChecker.java
+++ 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/cellular_affinity_test/DistributionChecker.java
@@ -28,9 +28,7 @@ import 
org.apache.ignite.internal.ducktest.utils.IgniteAwareApplication;
  *
  */
 public class DistributionChecker extends IgniteAwareApplication {
-    /**
-     * {@inheritDoc}
-     */
+    /** {@inheritDoc} */
     @Override protected void run(JsonNode jsonNode) {
         String cacheName = jsonNode.get("cacheName").asText();
         String attr = jsonNode.get("attr").asText();
diff --git 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
index 8c8be15..14b53a9 100644
--- 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
+++ 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/tests/pme_free_switch_test/SingleKeyTxStreamerApplication.java
@@ -55,7 +55,7 @@ public class SingleKeyTxStreamerApplication extends 
IgniteAwareApplication {
             if (!record && cnt > warmup) {
                 record = true;
 
-                initTime = System.currentTimeMillis();;
+                initTime = System.currentTimeMillis();
 
                 markInitialized();
             }
diff --git 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
index 107787b..a74b53f 100644
--- 
a/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
+++ 
b/modules/ducktests/src/main/java/org/apache/ignite/internal/ducktest/utils/IgniteAwareApplication.java
@@ -17,9 +17,14 @@
 
 package org.apache.ignite.internal.ducktest.utils;
 
+import java.lang.management.ManagementFactory;
+import java.lang.management.ThreadInfo;
 import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.ignite.Ignite;
+import org.apache.ignite.cluster.ClusterState;
+import org.apache.ignite.internal.IgniteEx;
 import org.apache.ignite.internal.IgniteInterruptedCheckedException;
+import 
org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -55,8 +60,8 @@ public abstract class IgniteAwareApplication {
     /** Terminated. */
     private static volatile boolean terminated;
 
-    /** Shutdown hook. */
-    private static volatile Thread hook;
+    /** State mutex. */
+    private static final Object stateMux = new Object();
 
     /** Ignite. */
     protected Ignite ignite;
@@ -68,7 +73,7 @@ public abstract class IgniteAwareApplication {
      * Default constructor.
      */
     protected IgniteAwareApplication() {
-        Runtime.getRuntime().addShutdownHook(hook = new Thread(() -> {
+        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
             log.info("SIGTERM recorded.");
 
             if (!finished && !broken)
@@ -79,8 +84,13 @@ public abstract class IgniteAwareApplication {
             if (log.isDebugEnabled())
                 log.debug("Waiting for graceful termination...");
 
+            int iter = 0;
+
             while (!finished && !broken) {
-                log.info("Waiting for graceful termination cycle...");
+                log.info("Waiting for graceful termination cycle... [iter=" + 
++iter + "]");
+
+                if (iter == 100)
+                    dumpThreads();
 
                 try {
                     U.sleep(100);
@@ -101,49 +111,72 @@ public abstract class IgniteAwareApplication {
      * Used to marks as started to perform actions. Suitable for async runs.
      */
     protected void markInitialized() {
-        assert !inited;
+        log.info("Marking as initialized.");
 
-        log.info(APP_INITED);
+        synchronized (stateMux) {
+            assert !inited;
+            assert !finished;
+            assert !broken;
 
-        inited = true;
+            log.info(APP_INITED);
+
+            inited = true;
+        }
     }
 
     /**
      *
      */
     protected void markFinished() {
-        assert !finished;
-        assert !broken;
+        log.info("Marking as finished.");
 
-        log.info(APP_FINISHED);
+        synchronized (stateMux) {
+            assert inited;
+            assert !finished;
+            assert !broken;
 
-        if (!terminated())
-            removeShutdownHook();
+            log.info(APP_FINISHED);
 
-        finished = true;
+            finished = true;
+        }
     }
 
     /**
      *
      */
-    private void markBroken() {
-        assert !finished;
-        assert !broken;
+    protected void markBroken(Throwable th) {
+        log.info("Marking as broken.");
 
-        log.info(APP_BROKEN);
+        synchronized (stateMux) {
+            if (broken) {
+                log.info("Already marked as broken.");
 
-        removeShutdownHook();
+                return;
+            }
 
-        broken = true;
+            recordResult("ERROR", th.toString());
+
+            assert !finished;
+
+            log.error(APP_BROKEN);
+
+            broken = true;
+        }
     }
 
     /**
      *
      */
-    private void removeShutdownHook() {
-        Runtime.getRuntime().removeShutdownHook(hook);
+    private void terminate() {
+        log.info("Marking as initialized.");
+
+        synchronized (stateMux) {
+            assert !terminated;
+
+            log.info(APP_TERMINATED);
 
-        log.info("Shutdown hook removed.");
+            terminated = true;
+        }
     }
 
     /**
@@ -157,17 +190,6 @@ public abstract class IgniteAwareApplication {
     /**
      *
      */
-    private void terminate() {
-        assert !terminated;
-
-        log.info(APP_TERMINATED);
-
-        terminated = true;
-    }
-
-    /**
-     *
-     */
     protected boolean terminated() {
         return terminated;
     }
@@ -226,12 +248,71 @@ public abstract class IgniteAwareApplication {
         catch (Throwable th) {
             log.error("Unexpected Application failure... ", th);
 
-            recordResult("ERROR", th.getMessage());
-
-            markBroken();
+            if (!broken)
+                markBroken(th);
         }
         finally {
             log.info("Application finished.");
         }
     }
+
+    /**
+     *
+     */
+    private static void dumpThreads() {
+        ThreadInfo[] infos = 
ManagementFactory.getThreadMXBean().dumpAllThreads(true, true);
+
+        for (ThreadInfo info : infos) {
+            log.info(info.toString());
+
+            if ("main".equals(info.getThreadName())) {
+                StringBuilder sb = new StringBuilder();
+
+                sb.append("main\n");
+
+                for (StackTraceElement element : info.getStackTrace()) {
+                    sb.append("\tat ").append(element.toString());
+                    sb.append('\n');
+                }
+
+                log.info(sb.toString());
+            }
+        }
+    }
+
+    /**
+     *
+     */
+    protected void waitForActivation() throws 
IgniteInterruptedCheckedException {
+        boolean newApi = 
ignite.cluster().localNode().version().greaterThanEqual(2, 9, 0);
+
+        while (newApi ? ignite.cluster().state() != ClusterState.ACTIVE : 
!ignite.cluster().active()) {
+            U.sleep(100);
+
+            log.info("Waiting for cluster activation");
+        }
+
+        log.info("Cluster Activated");
+    }
+
+    /**
+     *
+     */
+    protected void waitForRebalanced() throws 
IgniteInterruptedCheckedException {
+        boolean possible = 
ignite.cluster().localNode().version().greaterThanEqual(2, 8, 0);
+
+        if (possible) {
+            GridCachePartitionExchangeManager<?, ?> mgr = 
((IgniteEx)ignite).context().cache().context().exchange();
+
+            while (!mgr.lastFinishedFuture().rebalanced()) {
+                U.sleep(1000);
+
+                log.info("Waiting for cluster rebalance finish");
+            }
+
+            log.info("Cluster Rebalanced");
+        }
+        else
+            throw new UnsupportedOperationException("Operation supported since 
2.8.0");
+    }
 }
diff --git a/modules/ducktests/tests/docker/run_tests.sh 
b/modules/ducktests/tests/docker/run_tests.sh
index d8eb270..93a8b7c 100755
--- a/modules/ducktests/tests/docker/run_tests.sh
+++ b/modules/ducktests/tests/docker/run_tests.sh
@@ -21,7 +21,7 @@ SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
 # DuckerUp parameters are specified with env variables
 
 # Num of cotainers that ducktape will prepare for tests
-IGNITE_NUM_CONTAINERS=${IGNITE_NUM_CONTAINERS:-11}
+IGNITE_NUM_CONTAINERS=${IGNITE_NUM_CONTAINERS:-13}
 
 # Image name to run nodes
 default_image_name="ducker-ignite-openjdk-8"
@@ -55,7 +55,7 @@ The options are as follows:
     Display this help message.
 
 -n|--num-nodes
-    Specify how many nodes to start. Default number of nodes to start: 11.
+    Specify how many nodes to start. Default number of nodes to start: 13 (12 
+ 1 used by ducktape).
 
 -j|--max-parallel
     Specify max number of tests that can be run in parallel.
diff --git a/modules/ducktests/tests/ignitetest/services/ignite_app.py 
b/modules/ducktests/tests/ignitetest/services/ignite_app.py
index b40d01a..9e396f2 100644
--- a/modules/ducktests/tests/ignitetest/services/ignite_app.py
+++ b/modules/ducktests/tests/ignitetest/services/ignite_app.py
@@ -19,6 +19,9 @@ This module contains the base class to build Ignite aware 
application written on
 
 import re
 
+# pylint: disable=W0622
+from ducktape.errors import TimeoutError
+
 from ignitetest.services.utils.ignite_aware import IgniteAwareService
 
 
@@ -38,7 +41,7 @@ class IgniteApplicationService(IgniteAwareService):
         self.servicejava_class_name = servicejava_class_name
         self.java_class_name = java_class_name
         self.timeout_sec = timeout_sec
-        self.stop_timeout_sec = 10
+        self.params = params
 
     def start(self):
         super().start()
@@ -46,25 +49,46 @@ class IgniteApplicationService(IgniteAwareService):
         self.logger.info("Waiting for Ignite aware Application (%s) to 
start..." % self.java_class_name)
 
         self.await_event("Topology snapshot", self.timeout_sec, 
from_the_beginning=True)
-        
self.await_event("IGNITE_APPLICATION_INITIALIZED\\|IGNITE_APPLICATION_BROKEN", 
self.timeout_sec,
-                         from_the_beginning=True)
 
-        try:
-            self.await_event("IGNITE_APPLICATION_INITIALIZED", 1, 
from_the_beginning=True)
-        except Exception:
-            raise Exception("Java application execution failed. %s" % 
self.extract_result("ERROR")) from None
+        self.__check_status("IGNITE_APPLICATION_INITIALIZED", 
timeout=self.timeout_sec)
 
-    # pylint: disable=W0221
-    def stop_node(self, node, clean_shutdown=True, timeout_sec=20):
-        self.logger.info("%s Stopping node %s" % (self.__class__.__name__, 
str(node.account)))
-        node.account.kill_java_processes(self.servicejava_class_name, 
clean_shutdown=clean_shutdown, allow_fail=True)
+    def stop_async(self, clean_shutdown=True):
+        """
+        Stops node in async way.
+        """
+        self.logger.info("%s Stopping node %s" % (self.__class__.__name__, 
str(self.nodes[0].account)))
+        self.nodes[0].account.kill_java_processes(self.servicejava_class_name, 
clean_shutdown=clean_shutdown,
+                                                  allow_fail=True)
 
-        stopped = self.wait_node(node, timeout_sec=self.stop_timeout_sec)
+    def await_stopped(self, timeout_sec=10):
+        """
+        Awaits node stop finish.
+        """
+        stopped = self.wait_node(self.nodes[0], timeout_sec=timeout_sec)
         assert stopped, "Node %s: did not stop within the specified timeout of 
%s seconds" % \
-                        (str(node.account), str(self.stop_timeout_sec))
+                        (str(self.nodes[0].account), str(timeout_sec))
 
-        
self.await_event("IGNITE_APPLICATION_FINISHED\\|IGNITE_APPLICATION_BROKEN", 
from_the_beginning=True,
-                         timeout_sec=timeout_sec)
+        self.__check_status("IGNITE_APPLICATION_FINISHED", timeout=timeout_sec)
+
+    # pylint: disable=W0221
+    def stop_node(self, node, clean_shutdown=True, timeout_sec=10):
+        assert node == self.nodes[0]
+        self.stop_async(clean_shutdown)
+        self.await_stopped(timeout_sec)
+
+    def __check_status(self, desired, timeout=1):
+        self.await_event("%s\\|IGNITE_APPLICATION_BROKEN" % desired, timeout, 
from_the_beginning=True)
+
+        try:
+            self.await_event("IGNITE_APPLICATION_BROKEN", 1, 
from_the_beginning=True)
+            raise Exception("Java application execution failed. %s" % 
self.extract_result("ERROR"))
+        except TimeoutError:
+            pass
+
+        try:
+            self.await_event(desired, 1, from_the_beginning=True)
+        except Exception:
+            raise Exception("Java application execution failed.") from None
 
     def clean_node(self, node):
         if self.alive(node):
diff --git a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py 
b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
index 124fefb..afb975a 100644
--- a/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
+++ b/modules/ducktests/tests/ignitetest/services/utils/ignite_spec.py
@@ -22,11 +22,10 @@ import importlib
 import json
 from abc import ABCMeta, abstractmethod
 
-from ignitetest.services.utils.ignite_path import IgnitePath
 from ignitetest.services.utils.config_template import 
IgniteClientConfigTemplate, IgniteServerConfigTemplate
-from ignitetest.utils.version import DEV_BRANCH
-
+from ignitetest.services.utils.ignite_path import IgnitePath
 from ignitetest.services.utils.ignite_persistence import IgnitePersistenceAware
+from ignitetest.utils.version import DEV_BRANCH
 
 
 def resolve_spec(service, context, config, **kwargs):
@@ -147,6 +146,8 @@ class ApacheIgniteNodeSpec(IgniteNodeSpec, 
IgnitePersistenceAware):
         libs.append("log4j")
         libs = list(map(lambda m: self.path.module(m) + "/*", libs))
 
+        libs.append(IgnitePath(DEV_BRANCH).module("ducktests") + "/*")
+
         self.envs = {
             'EXCLUDE_TEST_CLASSES': 'true',
             'IGNITE_LOG_DIR': self.PERSISTENT_ROOT,
@@ -183,7 +184,7 @@ class ApacheIgniteApplicationSpec(IgniteApplicationSpec, 
IgnitePersistenceAware)
         }
 
         self.jvm_opts.extend([
-            "-DIGNITE_SUCCESS_FILE=" + self.PERSISTENT_ROOT + "/success_file ",
+            "-DIGNITE_SUCCESS_FILE=" + self.PERSISTENT_ROOT + "/success_file",
             "-Dlog4j.configDebug=true",
             "-DIGNITE_NO_SHUTDOWN_HOOK=true",  # allows to perform operations 
on app termination.
             "-Xmx1G",
diff --git 
a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py 
b/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
index 0681424..82bc730 100644
--- a/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/add_node_rebalance_test.py
@@ -47,15 +47,11 @@ class AddNodeRebalanceTest(IgniteTest):
             * Put data to it via IgniteClientApp.
             * Start one more node and awaits for rebalance to finish.
         """
-        self.stage("Start Ignite nodes")
-
         node_config = 
IgniteConfiguration(version=IgniteVersion(ignite_version))
 
         ignites = IgniteService(self.test_context, config=node_config, 
num_nodes=self.NUM_NODES - 1)
         ignites.start()
 
-        self.stage("Starting DataGenerationApplication")
-
         # This client just put some data to the cache.
         app_config = node_config._replace(client_mode=True, 
discovery_spi=from_ignite_cluster(ignites))
         IgniteApplicationService(self.test_context, config=app_config,
@@ -66,8 +62,6 @@ class AddNodeRebalanceTest(IgniteTest):
         ignite = IgniteService(self.test_context, 
node_config._replace(discovery_spi=from_ignite_cluster(ignites)),
                                num_nodes=1)
 
-        self.stage("Starting Ignite node")
-
         ignite.start()
 
         start = self.monotonic()
diff --git a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py 
b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
index c957e43..3c38b22 100644
--- a/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/cellular_affinity_test.py
@@ -22,11 +22,12 @@ from jinja2 import Template
 
 from ignitetest.services.ignite import IgniteService
 from ignitetest.services.ignite_app import IgniteApplicationService
+from ignitetest.services.utils.control_utility import ControlUtility
 from ignitetest.services.utils.ignite_configuration import 
IgniteConfiguration, IgniteClientConfiguration
 from ignitetest.services.utils.ignite_configuration.discovery import 
from_ignite_cluster
 from ignitetest.utils import ignite_versions, version_if
 from ignitetest.utils.ignite_test import IgniteTest
-from ignitetest.utils.version import DEV_BRANCH, IgniteVersion
+from ignitetest.utils.version import DEV_BRANCH, IgniteVersion, LATEST_2_8
 
 
 # pylint: disable=W0223
@@ -40,6 +41,8 @@ class CellularAffinity(IgniteTest):
 
     CACHE_NAME = "test-cache"
 
+    PREPARED_TX_CNT = 500  # possible amount at real cluster under load (per 
cell).
+
     CONFIG_TEMPLATE = """
             <property name="cacheConfiguration">
                 <list>
@@ -55,6 +58,7 @@ class CellularAffinity(IgniteTest):
                         </property>
                         <property name="name" value="{{ cacheName }}"/>
                         <property name="backups" value="{{ nodes }}"/> 
+                        <property name="atomicityMode" value="TRANSACTIONAL"/>
                     </bean>
                 </list>
             </property>
@@ -73,15 +77,17 @@ class CellularAffinity(IgniteTest):
     @cluster(num_nodes=NUM_NODES * 3 + 1)
     @version_if(lambda version: version >= DEV_BRANCH)
     @ignite_versions(str(DEV_BRANCH))
-    def test(self, ignite_version):
+    def test_distribution(self, ignite_version):
         """
-        Test Cellular Affinity scenario (partition distribution).
+        Tests Cellular Affinity scenario (partition distribution).
         """
         cell1 = self.start_cell(ignite_version, ['-D' + 
CellularAffinity.ATTRIBUTE + '=1'])
         self.start_cell(ignite_version, ['-D' + CellularAffinity.ATTRIBUTE + 
'=2'], joined_cluster=cell1)
         self.start_cell(ignite_version, ['-D' + CellularAffinity.ATTRIBUTE + 
'=XXX', '-DRANDOM=42'],
                         joined_cluster=cell1)
 
+        ControlUtility(cell1, self.test_context).activate()
+
         checker = IgniteApplicationService(
             self.test_context,
             IgniteClientConfiguration(version=IgniteVersion(ignite_version), 
discovery_spi=from_ignite_cluster(cell1)),
@@ -92,15 +98,117 @@ class CellularAffinity(IgniteTest):
 
         checker.run()
 
-    def start_cell(self, version, jvm_opts, joined_cluster=None):
+    # pylint: disable=R0914
+    @cluster(num_nodes=NUM_NODES * (3 + 1))
+    @ignite_versions(str(DEV_BRANCH), str(LATEST_2_8))
+    def test_latency(self, ignite_version):
+        """
+        Tests Cellular switch tx latency.
+        """
+        data = {}
+
+        cell1, prepared_tx_loader1 = 
self.start_cell_with_prepared_txs(ignite_version, "C1")
+        _, prepared_tx_loader2 = 
self.start_cell_with_prepared_txs(ignite_version, "C2", joined_cluster=cell1)
+        _, prepared_tx_loader3 = 
self.start_cell_with_prepared_txs(ignite_version, "C3", joined_cluster=cell1)
+
+        loaders = [prepared_tx_loader1, prepared_tx_loader2, 
prepared_tx_loader3]
+
+        failed_loader = prepared_tx_loader3
+
+        tx_streamer1 = self.start_tx_streamer(ignite_version, "C1", 
joined_cluster=cell1)
+        tx_streamer2 = self.start_tx_streamer(ignite_version, "C2", 
joined_cluster=cell1)
+        tx_streamer3 = self.start_tx_streamer(ignite_version, "C3", 
joined_cluster=cell1)
+
+        streamers = [tx_streamer1, tx_streamer2, tx_streamer3]
+
+        for streamer in streamers:  # starts tx streaming with latency record 
(with some warmup).
+            streamer.start()
+
+        ControlUtility(cell1, 
self.test_context).disable_baseline_auto_adjust()  # baseline set.
+        ControlUtility(cell1, self.test_context).activate()
+
+        for loader in loaders:
+            loader.await_event("All transactions prepared", 180, 
from_the_beginning=True)
+
+        for streamer in streamers:
+            streamer.await_event("Warmup finished", 180, 
from_the_beginning=True)
+
+        failed_loader.stop_async()  # node left with prepared txs.
+
+        for streamer in streamers:
+            streamer.await_event("Node left topology\\|Node FAILED", 60, 
from_the_beginning=True)
+
+        for streamer in streamers:  # just an assertion that we have PME-free 
switch.
+            streamer.await_event("exchangeFreeSwitch=true", 60, 
from_the_beginning=True)
+
+        for streamer in streamers:  # waiting for streaming continuation.
+            streamer.await_event("Application streamed", 60)
+
+        for streamer in streamers:  # stops streaming and records results.
+            streamer.stop_async()
+
+        for streamer in streamers:
+            streamer.await_stopped()
+
+            cell = streamer.params["cell"]
+
+            data["[%s cell %s]" % ("alive" if cell is not 
failed_loader.params["cell"] else "broken", cell)] = \
+                "worst_latency=%s, tx_streamed=%s, measure_duration=%s" % (
+                    streamer.extract_result("WORST_LATENCY"), 
streamer.extract_result("STREAMED"),
+                    streamer.extract_result("MEASURE_DURATION"))
+
+        return data
+
+    def start_tx_streamer(self, version, cell, joined_cluster):
+        """
+        Starts transaction streamer.
+        """
+        return IgniteApplicationService(
+            self.test_context,
+            IgniteClientConfiguration(version=IgniteVersion(version), 
properties=self.properties(),
+                                      
discovery_spi=from_ignite_cluster(joined_cluster)),
+            
java_class_name="org.apache.ignite.internal.ducktest.tests.cellular_affinity_test.CellularTxStreamer",
+            params={"cacheName": CellularAffinity.CACHE_NAME,
+                    "attr": CellularAffinity.ATTRIBUTE,
+                    "cell": cell,
+                    "warmup": 10000},
+            timeout_sec=180)
+
+    def start_cell_with_prepared_txs(self, version, cell_id, 
joined_cluster=None):
+        """
+        Starts cell with prepared transactions.
+        """
+        nodes = self.start_cell(version, ['-D' + CellularAffinity.ATTRIBUTE + 
'=' + cell_id],
+                                CellularAffinity.NUM_NODES - 1, joined_cluster)
+
+        prepared_tx_streamer = IgniteApplicationService(  # last server node 
at the cell.
+            self.test_context,
+            IgniteConfiguration(version=IgniteVersion(version), 
properties=self.properties(),
+                                discovery_spi=from_ignite_cluster(nodes)),  # 
Server node.
+            java_class_name=
+            
"org.apache.ignite.internal.ducktest.tests.cellular_affinity_test.CellularPreparedTxStreamer",
+            params={"cacheName": CellularAffinity.CACHE_NAME,
+                    "attr": CellularAffinity.ATTRIBUTE,
+                    "cell": cell_id,
+                    "txCnt": CellularAffinity.PREPARED_TX_CNT},
+            jvm_opts=['-D' + CellularAffinity.ATTRIBUTE + '=' + cell_id],
+            timeout_sec=180)
+
+        prepared_tx_streamer.start()  # starts last server node and creates 
prepared txs on it.
+
+        return nodes, prepared_tx_streamer
+
+    def start_cell(self, version, jvm_opts, nodes_cnt=NUM_NODES, 
joined_cluster=None):
         """
         Starts cell.
         """
-        config = IgniteConfiguration(version=IgniteVersion(version), 
properties=self.properties())
+        config = IgniteConfiguration(version=IgniteVersion(version), 
properties=self.properties(),
+                                     cluster_state="INACTIVE")
+
         if joined_cluster:
             config = 
config._replace(discovery_spi=from_ignite_cluster(joined_cluster))
 
-        ignites = IgniteService(self.test_context, config, 
num_nodes=CellularAffinity.NUM_NODES, jvm_opts=jvm_opts)
+        ignites = IgniteService(self.test_context, config, 
num_nodes=nodes_cnt, jvm_opts=jvm_opts)
 
         ignites.start()
 
diff --git a/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py 
b/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
index 395b24b..3b54762 100644
--- a/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
+++ b/modules/ducktests/tests/ignitetest/tests/pme_free_switch_test.py
@@ -43,12 +43,10 @@ class PmeFreeSwitchTest(IgniteTest):
     @ignite_versions(str(DEV_BRANCH), str(LATEST_2_7))
     def test(self, ignite_version):
         """
-        Test PME free scenario (node stop).
+        Tests PME free scenario (node stop).
         """
         data = {}
 
-        self.stage("Starting nodes")
-
         config = IgniteConfiguration(
             version=IgniteVersion(ignite_version),
             caches=[CacheConfiguration(name='test-cache', backups=2, 
atomicity_mode='TRANSACTIONAL')]
@@ -58,8 +56,6 @@ class PmeFreeSwitchTest(IgniteTest):
 
         ignites.start()
 
-        self.stage("Starting long_tx_streamer")
-
         client_config = config._replace(client_mode=True,
                                         
discovery_spi=from_ignite_cluster(ignites, slice(0, self.NUM_NODES - 1)))
 
@@ -71,8 +67,6 @@ class PmeFreeSwitchTest(IgniteTest):
 
         long_tx_streamer.start()
 
-        self.stage("Starting single_key_tx_streamer")
-
         single_key_tx_streamer = IgniteApplicationService(
             self.test_context,
             client_config,
@@ -85,20 +79,14 @@ class PmeFreeSwitchTest(IgniteTest):
         if IgniteVersion(ignite_version) >= V_2_8_0:
             ControlUtility(ignites, 
self.test_context).disable_baseline_auto_adjust()
 
-        self.stage("Stopping server node")
-
         ignites.stop_node(ignites.nodes[self.NUM_NODES - 1])
 
         long_tx_streamer.await_event("Node left topology", 60, 
from_the_beginning=True)
 
         time.sleep(30)  # keeping txs alive for 30 seconds.
 
-        self.stage("Stopping long_tx_streamer")
-
         long_tx_streamer.stop()
 
-        self.stage("Stopping single_key_tx_streamer")
-
         single_key_tx_streamer.stop()
 
         data["Worst latency (ms)"] = 
single_key_tx_streamer.extract_result("WORST_LATENCY")
diff --git a/modules/ducktests/tests/ignitetest/utils/ignite_test.py 
b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
index a4bfe7d..feb5993 100644
--- a/modules/ducktests/tests/ignitetest/utils/ignite_test.py
+++ b/modules/ducktests/tests/ignitetest/utils/ignite_test.py
@@ -29,13 +29,6 @@ class IgniteTest(Test):
     def __init__(self, test_context):
         super().__init__(test_context=test_context)
 
-    def stage(self, msg):
-        """
-        Print stage mark.
-        :param msg: Stage mark message.
-        """
-        self.logger.info("[TEST_STAGE] " + msg)
-
     @staticmethod
     def monotonic():
         """

Reply via email to