This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cca05ca  KAFKA-8331: stream static membership system test (#6877)
cca05ca is described below

commit cca05cace4105c829f303c13eed8ace2efd7fa0c
Author: Boyang Chen <boy...@confluent.io>
AuthorDate: Fri Jun 7 13:52:12 2019 -0700

    KAFKA-8331: stream static membership system test (#6877)
    
    As title suggested, we boost 3 stream instances stream job with one minute 
session timeout, and once the group is stable, doing couple of rolling bounces 
for the entire cluster. Every rejoin based on restart should have no generation 
bump on the client side.
    
    Reviewers: Guozhang Wang <wangg...@gmail.com>,  Bill Bejeck 
<bbej...@gmail.com>
---
 .../consumer/internals/AbstractCoordinator.java    |  5 ++
 .../consumer/internals/ConsumerCoordinator.java    |  3 +-
 .../runtime/distributed/WorkerCoordinator.java     |  1 +
 .../streams/tests/StaticMemberTestClient.java      | 84 ++++++++++++++++++++++
 .../streams/tests/StreamsNamedRepartitionTest.java |  2 -
 .../{streams_property.py => consumer_property.py}  | 11 +--
 tests/kafkatest/services/streams.py                | 22 ++++++
 tests/kafkatest/services/streams_property.py       |  4 --
 .../streams_named_repartition_topic_test.py        | 35 ++-------
 .../tests/streams/streams_optimized_test.py        | 21 ++----
 ...c_test.py => streams_static_membership_test.py} | 83 ++++++++++-----------
 .../tests/streams/streams_upgrade_test.py          | 24 +++----
 .../streams/utils/__init__.py}                     | 12 +---
 tests/kafkatest/tests/streams/utils/util.py        | 36 ++++++++++
 14 files changed, 211 insertions(+), 132 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 54678f7..73563fd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -802,6 +802,11 @@ public abstract class AbstractCoordinator implements 
Closeable {
         return generation;
     }
 
+    protected synchronized String memberId() {
+        return generation == null ? JoinGroupRequest.UNKNOWN_MEMBER_ID :
+                generation.memberId;
+    }
+
     /**
      * Check whether given generation id is matching the record within current 
generation.
      * Only using in unit tests.
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 5d39da5..64bf17d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -585,7 +585,8 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
     // visible for testing
     void invokeCompletedOffsetCommitCallbacks() {
         if (asyncCommitFenced.get()) {
-            throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id " + groupInstanceId.orElse("unset_instance_id"));
+            throw new FencedInstanceIdException("Get fenced exception for 
group.instance.id: " +
+                    groupInstanceId.orElse("unset_instance_id") + ", current 
member.id is " + memberId());
         }
         while (true) {
             OffsetCommitCompletion completion = completedOffsetCommits.poll();
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index fd7c7a4..230a272 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -227,6 +227,7 @@ public class WorkerCoordinator extends AbstractCoordinator 
implements Closeable
         return super.rejoinNeededOrPending() || (assignmentSnapshot == null || 
assignmentSnapshot.failed()) || rejoinRequested;
     }
 
+    @Override
     public String memberId() {
         Generation generation = generation();
         if (generation != null)
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java
new file mode 100644
index 0000000..96ccad4
--- /dev/null
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+
+import java.util.Objects;
+import java.util.Properties;
+
+public class StaticMemberTestClient {
+
+    private static String testName = "StaticMemberTestClient";
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) throws Exception {
+        if (args.length < 1) {
+            System.err.println(testName + " requires one argument 
(properties-file) but none provided: ");
+        }
+
+        System.out.println("StreamsTest instance started");
+
+        final String propFileName = args[0];
+
+        final Properties streamsProperties = Utils.loadProps(propFileName);
+
+        final String groupInstanceId = 
Objects.requireNonNull(streamsProperties.getProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG));
+
+        System.out.println(testName + " instance started with 
group.instance.id " + groupInstanceId);
+        System.out.println("props=" + streamsProperties);
+        System.out.flush();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final String inputTopic = (String) 
(Objects.requireNonNull(streamsProperties.remove("input.topic")));
+
+        final KStream dataStream = builder.stream(inputTopic);
+        dataStream.peek((k, v) ->  System.out.println(String.format("PROCESSED 
key=%s value=%s", k, v)));
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, testName);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+
+        config.putAll(streamsProperties);
+
+        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+        streams.setStateListener((newState, oldState) -> {
+            if (oldState == KafkaStreams.State.REBALANCING && newState == 
KafkaStreams.State.RUNNING) {
+                System.out.println("REBALANCING -> RUNNING");
+                System.out.flush();
+            }
+        });
+
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("Static membership test closed");
+                System.out.flush();
+            }
+        });
+    }
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
index 911716f..c408d9f 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java
@@ -117,7 +117,5 @@ public class StreamsNamedRepartitionTest {
             System.out.println("NAMED_REPARTITION_TEST Streams Stopped");
             System.out.flush();
         }));
-
     }
-
 }
diff --git a/tests/kafkatest/services/streams_property.py 
b/tests/kafkatest/services/consumer_property.py
similarity index 84%
copy from tests/kafkatest/services/streams_property.py
copy to tests/kafkatest/services/consumer_property.py
index 054ea64..0a9756a 100644
--- a/tests/kafkatest/services/streams_property.py
+++ b/tests/kafkatest/services/consumer_property.py
@@ -14,13 +14,8 @@
 # limitations under the License.
 
 """
-Define Streams configuration property names here.
+Define Consumer configuration property names here.
 """
 
-STATE_DIR = "state.dir"
-KAFKA_SERVERS = "bootstrap.servers"
-NUM_THREADS = "num.stream.threads"
-
-
-
-
+GROUP_INSTANCE_ID = "group.instance.id"
+SESSION_TIMEOUT_MS = "session.timeout.ms"
diff --git a/tests/kafkatest/services/streams.py 
b/tests/kafkatest/services/streams.py
index 5e2c2e9..70564b9 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -16,6 +16,7 @@
 import os.path
 import signal
 import streams_property
+import consumer_property
 from ducktape.services.service import Service
 from ducktape.utils.util import wait_until
 from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
@@ -534,3 +535,24 @@ class 
StreamsNamedRepartitionTopicService(StreamsTestBaseService):
 
         cfg = KafkaConfig(**properties)
         return cfg.render()
+
+class StaticMemberTestService(StreamsTestBaseService):
+    def __init__(self, test_context, kafka, group_instance_id, num_threads):
+        super(StaticMemberTestService, self).__init__(test_context,
+                                                      kafka,
+                                                      
"org.apache.kafka.streams.tests.StaticMemberTestClient",
+                                                      "")
+        self.INPUT_TOPIC = None
+        self.GROUP_INSTANCE_ID = group_instance_id
+        self.NUM_THREADS = num_threads
+    def prop_file(self):
+        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
+                      streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
+                      streams_property.NUM_THREADS: self.NUM_THREADS,
+                      consumer_property.GROUP_INSTANCE_ID: 
self.GROUP_INSTANCE_ID,
+                      consumer_property.SESSION_TIMEOUT_MS: 60000}
+
+        properties['input.topic'] = self.INPUT_TOPIC
+
+        cfg = KafkaConfig(**properties)
+        return cfg.render()
diff --git a/tests/kafkatest/services/streams_property.py 
b/tests/kafkatest/services/streams_property.py
index 054ea64..99f0ece 100644
--- a/tests/kafkatest/services/streams_property.py
+++ b/tests/kafkatest/services/streams_property.py
@@ -20,7 +20,3 @@ Define Streams configuration property names here.
 STATE_DIR = "state.dir"
 KAFKA_SERVERS = "bootstrap.servers"
 NUM_THREADS = "num.stream.threads"
-
-
-
-
diff --git 
a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py 
b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
index b9894ee..1fcdd5f 100644
--- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
+++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
@@ -13,14 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import time
 from ducktape.tests.test import Test
-from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsNamedRepartitionTopicService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
-
+from kafkatest.tests.streams.utils import verify_stopped, stop_processors, 
verify_running
 
 class StreamsNamedRepartitionTopicTest(Test):
     """
@@ -32,6 +30,7 @@ class StreamsNamedRepartitionTopicTest(Test):
     input_topic = 'inputTopic'
     aggregation_topic = 'aggregationTopic'
     pattern = 'AGGREGATED'
+    stopped_message = 'NAMED_REPARTITION_TEST Streams Stopped'
 
     def __init__(self, test_context):
         super(StreamsNamedRepartitionTopicTest, self).__init__(test_context)
@@ -66,43 +65,25 @@ class StreamsNamedRepartitionTopicTest(Test):
         for processor in processors:
             processor.CLEAN_NODE_ENABLED = False
             self.set_topics(processor)
-            self.verify_running(processor, 'REBALANCING -> RUNNING')
+            verify_running(processor, 'REBALANCING -> RUNNING')
 
         self.verify_processing(processors)
 
         # do rolling upgrade
         for processor in processors:
-            self.verify_stopped(processor)
+            verify_stopped(processor, self.stopped_message)
             #  will tell app to add operations before repartition topic
             processor.ADD_ADDITIONAL_OPS = 'true'
-            self.verify_running(processor, 'UPDATED Topology')
+            verify_running(processor, 'UPDATED Topology')
 
         self.verify_processing(processors)
 
-        self.stop_processors(processors)
+        stop_processors(processors, self.stopped_message)
 
         self.producer.stop()
         self.kafka.stop()
         self.zookeeper.stop()
 
-    @staticmethod
-    def verify_running(processor, message):
-        node = processor.node
-        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.start()
-            monitor.wait_until(message,
-                               timeout_sec=60,
-                               err_msg="Never saw '%s' message " % message + 
str(processor.node.account))
-
-    @staticmethod
-    def verify_stopped(processor):
-        node = processor.node
-        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.stop()
-            monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped',
-                               timeout_sec=60,
-                               err_msg="'NAMED_REPARTITION_TEST Streams 
Stopped' message" + str(processor.node.account))
-
     def verify_processing(self, processors):
         for processor in processors:
             with processor.node.account.monitor_log(processor.STDOUT_FILE) as 
monitor:
@@ -110,10 +91,6 @@ class StreamsNamedRepartitionTopicTest(Test):
                                    timeout_sec=60,
                                    err_msg="Never saw processing of %s " % 
self.pattern + str(processor.node.account))
 
-    def stop_processors(self, processors):
-        for processor in processors:
-            self.verify_stopped(processor)
-
     def set_topics(self, processor):
         processor.INPUT_TOPIC = self.input_topic
         processor.AGGREGATION_TOPIC = self.aggregation_topic
diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py 
b/tests/kafkatest/tests/streams/streams_optimized_test.py
index 31efc0d..ecd84c2 100644
--- a/tests/kafkatest/tests/streams/streams_optimized_test.py
+++ b/tests/kafkatest/tests/streams/streams_optimized_test.py
@@ -15,12 +15,11 @@
 
 import time
 from ducktape.tests.test import Test
-from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsOptimizedUpgradeTestService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
-
+from kafkatest.tests.streams.utils import stop_processors
 
 class StreamsOptimizedTest(Test):
     """
@@ -33,6 +32,7 @@ class StreamsOptimizedTest(Test):
     reduce_topic = 'reduceTopic'
     join_topic = 'joinTopic'
     operation_pattern = 'AGGREGATED\|REDUCED\|JOINED'
+    stopped_message = 'OPTIMIZE_TEST Streams Stopped'
 
     def __init__(self, test_context):
         super(StreamsOptimizedTest, self).__init__(test_context)
@@ -75,7 +75,7 @@ class StreamsOptimizedTest(Test):
 
         self.verify_processing(processors, verify_individual_operations=False)
 
-        self.stop_processors(processors)
+        stop_processors(processors, self.stopped_message)
 
         # start again with topology optimized
         for processor in processors:
@@ -84,7 +84,7 @@ class StreamsOptimizedTest(Test):
 
         self.verify_processing(processors, verify_individual_operations=True)
 
-        self.stop_processors(processors)
+        stop_processors(processors, self.stopped_message)
 
         self.producer.stop()
         self.kafka.stop()
@@ -100,15 +100,6 @@ class StreamsOptimizedTest(Test):
                                err_msg="Never saw 'REBALANCING -> RUNNING with 
REPARTITION TOPIC COUNT=%s' message "
                                        % repartition_topic_count + 
str(processor.node.account))
 
-    @staticmethod
-    def verify_stopped(processor):
-        node = processor.node
-        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.stop()
-            monitor.wait_until('OPTIMIZE_TEST Streams Stopped',
-                               timeout_sec=60,
-                               err_msg="'OPTIMIZE_TEST Streams Stopped' 
message" + str(processor.node.account))
-
     def verify_processing(self, processors, verify_individual_operations):
         for processor in processors:
             if not self.all_source_subtopology_tasks(processor):
@@ -139,10 +130,6 @@ class StreamsOptimizedTest(Test):
 
         return False
 
-    def stop_processors(self, processors):
-        for processor in processors:
-            self.verify_stopped(processor)
-
     def set_topics(self, processor):
         processor.INPUT_TOPIC = self.input_topic
         processor.AGGREGATION_TOPIC = self.aggregation_topic
diff --git 
a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py 
b/tests/kafkatest/tests/streams/streams_static_membership_test.py
similarity index 53%
copy from tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
copy to tests/kafkatest/tests/streams/streams_static_membership_test.py
index b9894ee..a466ea8 100644
--- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py
+++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py
@@ -13,31 +13,28 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import time
 from ducktape.tests.test import Test
-from ducktape.utils.util import wait_until
 from kafkatest.services.kafka import KafkaService
-from kafkatest.services.streams import StreamsNamedRepartitionTopicService
+from kafkatest.services.streams import StaticMemberTestService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.streams.utils import verify_stopped, stop_processors, 
verify_running, extract_generation_from_logs
 
-
-class StreamsNamedRepartitionTopicTest(Test):
+class StreamsStaticMembershipTest(Test):
     """
-    Tests using a named repartition topic by starting
-    application then doing a rolling upgrade with added
-    operations and the application still runs
+    Tests using static membership when broker points to minimum supported
+    version (2.3) or higher.
     """
 
     input_topic = 'inputTopic'
-    aggregation_topic = 'aggregationTopic'
-    pattern = 'AGGREGATED'
+    pattern = 'PROCESSED'
+    running_message = 'REBALANCING -> RUNNING'
+    stopped_message = 'Static membership test closed'
 
     def __init__(self, test_context):
-        super(StreamsNamedRepartitionTopicTest, self).__init__(test_context)
+        super(StreamsStaticMembershipTest, self).__init__(test_context)
         self.topics = {
-            self.input_topic: {'partitions': 6},
-            self.aggregation_topic: {'partitions': 6}
+            self.input_topic: {'partitions': 18},
         }
 
         self.zookeeper = ZookeeperService(self.test_context, num_nodes=1)
@@ -51,13 +48,14 @@ class StreamsNamedRepartitionTopicTest(Test):
                                            throughput=1000,
                                            acks=1)
 
-    def test_upgrade_topology_with_named_repartition_topic(self):
+    def 
test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self):
         self.zookeeper.start()
         self.kafka.start()
 
-        processor1 = StreamsNamedRepartitionTopicService(self.test_context, 
self.kafka)
-        processor2 = StreamsNamedRepartitionTopicService(self.test_context, 
self.kafka)
-        processor3 = StreamsNamedRepartitionTopicService(self.test_context, 
self.kafka)
+        numThreads = 3
+        processor1 = StaticMemberTestService(self.test_context, self.kafka, 
"consumer-A", numThreads)
+        processor2 = StaticMemberTestService(self.test_context, self.kafka, 
"consumer-B", numThreads)
+        processor3 = StaticMemberTestService(self.test_context, self.kafka, 
"consumer-C", numThreads)
 
         processors = [processor1, processor2, processor3]
 
@@ -66,43 +64,39 @@ class StreamsNamedRepartitionTopicTest(Test):
         for processor in processors:
             processor.CLEAN_NODE_ENABLED = False
             self.set_topics(processor)
-            self.verify_running(processor, 'REBALANCING -> RUNNING')
+            verify_running(processor, self.running_message)
 
         self.verify_processing(processors)
 
-        # do rolling upgrade
+        # do several rolling bounces
+        num_bounces = 3
+        for i in range(0, num_bounces):
+            for processor in processors:
+                verify_stopped(processor, self.stopped_message)
+                verify_running(processor, self.running_message)
+
+        stable_generation = -1
         for processor in processors:
-            self.verify_stopped(processor)
-            #  will tell app to add operations before repartition topic
-            processor.ADD_ADDITIONAL_OPS = 'true'
-            self.verify_running(processor, 'UPDATED Topology')
+            generations = extract_generation_from_logs(processor)
+            num_bounce_generations = num_bounces * numThreads
+            assert num_bounce_generations <= len(generations), \
+                "Smaller than minimum expected %d generation messages, actual 
%d" % (num_bounce_generations, len(generations))
+
+            for generation in generations[-num_bounce_generations:]:
+                generation = int(generation)
+                if stable_generation == -1:
+                    stable_generation = generation
+                assert stable_generation == generation, \
+                    "Stream rolling bounce have caused unexpected generation 
bump %d" % generation
 
         self.verify_processing(processors)
 
-        self.stop_processors(processors)
+        stop_processors(processors, self.stopped_message)
 
         self.producer.stop()
         self.kafka.stop()
         self.zookeeper.stop()
 
-    @staticmethod
-    def verify_running(processor, message):
-        node = processor.node
-        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.start()
-            monitor.wait_until(message,
-                               timeout_sec=60,
-                               err_msg="Never saw '%s' message " % message + 
str(processor.node.account))
-
-    @staticmethod
-    def verify_stopped(processor):
-        node = processor.node
-        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
-            processor.stop()
-            monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped',
-                               timeout_sec=60,
-                               err_msg="'NAMED_REPARTITION_TEST Streams 
Stopped' message" + str(processor.node.account))
-
     def verify_processing(self, processors):
         for processor in processors:
             with processor.node.account.monitor_log(processor.STDOUT_FILE) as 
monitor:
@@ -110,10 +104,5 @@ class StreamsNamedRepartitionTopicTest(Test):
                                    timeout_sec=60,
                                    err_msg="Never saw processing of %s " % 
self.pattern + str(processor.node.account))
 
-    def stop_processors(self, processors):
-        for processor in processors:
-            self.verify_stopped(processor)
-
     def set_topics(self, processor):
         processor.INPUT_TOPIC = self.input_topic
-        processor.AGGREGATION_TOPIC = self.aggregation_topic
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 37ab770..6a6a8bc 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -23,6 +23,7 @@ from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsSmokeTestJobRunnerService, \
     StreamsUpgradeTestJobRunnerService
 from kafkatest.services.zookeeper import ZookeeperService
+from kafkatest.tests.streams.utils import extract_generation_from_logs
 from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
     LATEST_2_0, LATEST_2_1, LATEST_2_2, DEV_BRANCH, DEV_VERSION, KafkaVersion
 
@@ -45,7 +46,7 @@ anyone can verify that by calling
 curl 
https://s3-us-west-2.amazonaws.com/kafka-packages/kafka_$scala_version-$version.tgz
 to download the jar
 and if it is not uploaded yet, ping the dev@kafka mailing list to request it 
being uploaded.
 
-This test needs to get updated, but this requires several steps
+This test needs to get updated, but this requires several steps,
 which are outlined here:
 
 1. Update all relevant versions in tests/kafkatest/version.py this will 
include adding a new version for the new
@@ -57,17 +58,17 @@ which are outlined here:
    during the system test run.
    
 3. Update the vagrant/bash.sh file to include all new versions, including the 
newly released version
-   and all point releases for existing releases.  You only need to list the 
latest version in 
+   and all point releases for existing releases. You only need to list the 
latest version in 
    this file.
    
 4. Then update all relevant versions in the tests/docker/Dockerfile
 
-5. Add a new "upgrade-system-tests-XXXX module under streams.  You can 
probably just copy the 
-   latest system test module from the last release.  Just make sure to update 
the systout print
-   statement in StreamsUpgradeTest to the version for the release.  After you 
add the new module
+5. Add a new upgrade-system-tests-XXXX module under streams. You can probably 
just copy the 
+   latest system test module from the last release. Just make sure to update 
the systout print
+   statement in StreamsUpgradeTest to the version for the release. After you 
add the new module
    you'll need to update settings.gradle file to include the name of the 
module you just created
-   for gradle to recognize the newly added module
-   
+   for gradle to recognize the newly added module.
+
 6. Then you'll need to update any version changes in gradle/dependencies.gradle
 
 """
@@ -598,9 +599,9 @@ class StreamsUpgradeTest(Test):
                     retries = 0
 
                     while retries < 10:
-                        processor_found = 
self.extract_generation_from_logs(processor)
-                        first_other_processor_found = 
self.extract_generation_from_logs(first_other_processor)
-                        second_other_processor_found = 
self.extract_generation_from_logs(second_other_processor)
+                        processor_found = 
extract_generation_from_logs(processor)
+                        first_other_processor_found = 
extract_generation_from_logs(first_other_processor)
+                        second_other_processor_found = 
extract_generation_from_logs(second_other_processor)
 
                         if len(processor_found) > 0 and 
len(first_other_processor_found) > 0 and len(second_other_processor_found) > 0:
                             self.logger.info("processor: " + 
str(processor_found))
@@ -632,9 +633,6 @@ class StreamsUpgradeTest(Test):
 
         return current_generation
 
-    def extract_generation_from_logs(self, processor):
-        return list(processor.node.account.ssh_capture("grep \"Successfully 
joined group with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == 
\"generation\") beginning=i+1; if($i== 
\"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i 
}; for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % 
processor.LOG_FILE, allow_fail=True))
-
     def extract_highest_generation(self, found_generations):
         return int(found_generations[-1])
 
diff --git a/tests/kafkatest/services/streams_property.py 
b/tests/kafkatest/tests/streams/utils/__init__.py
similarity index 83%
copy from tests/kafkatest/services/streams_property.py
copy to tests/kafkatest/tests/streams/utils/__init__.py
index 054ea64..6d2957f 100644
--- a/tests/kafkatest/services/streams_property.py
+++ b/tests/kafkatest/tests/streams/utils/__init__.py
@@ -13,14 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-"""
-Define Streams configuration property names here.
-"""
-
-STATE_DIR = "state.dir"
-KAFKA_SERVERS = "bootstrap.servers"
-NUM_THREADS = "num.stream.threads"
-
-
-
-
+from util import verify_running, verify_stopped, stop_processors, 
extract_generation_from_logs
diff --git a/tests/kafkatest/tests/streams/utils/util.py 
b/tests/kafkatest/tests/streams/utils/util.py
new file mode 100644
index 0000000..683b199
--- /dev/null
+++ b/tests/kafkatest/tests/streams/utils/util.py
@@ -0,0 +1,36 @@
+# Copyright 2015 Confluent Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+def verify_running(processor, message):
+    node = processor.node
+    with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+        processor.start()
+        monitor.wait_until(message,
+                           timeout_sec=60,
+                           err_msg="Never saw '%s' message " % message + 
str(processor.node.account))
+
+def verify_stopped(processor, message):
+    node = processor.node
+    with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+        processor.stop()
+        monitor.wait_until(message,
+                           timeout_sec=60,
+                           err_msg="'%s' message " % message + 
str(processor.node.account))
+
+def stop_processors(processors, stopped_message):
+    for processor in processors:
+        verify_stopped(processor, stopped_message)
+
+def extract_generation_from_logs(processor):
+    return list(processor.node.account.ssh_capture("grep \"Successfully joined 
group with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == 
\"generation\") beginning=i+1; if($i== 
\"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i 
}; for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % 
processor.LOG_FILE, allow_fail=True))

Reply via email to