This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new cca05ca KAFKA-8331: stream static membership system test (#6877) cca05ca is described below commit cca05cace4105c829f303c13eed8ace2efd7fa0c Author: Boyang Chen <boy...@confluent.io> AuthorDate: Fri Jun 7 13:52:12 2019 -0700 KAFKA-8331: stream static membership system test (#6877) As title suggested, we boost 3 stream instances stream job with one minute session timeout, and once the group is stable, doing couple of rolling bounces for the entire cluster. Every rejoin based on restart should have no generation bump on the client side. Reviewers: Guozhang Wang <wangg...@gmail.com>, Bill Bejeck <bbej...@gmail.com> --- .../consumer/internals/AbstractCoordinator.java | 5 ++ .../consumer/internals/ConsumerCoordinator.java | 3 +- .../runtime/distributed/WorkerCoordinator.java | 1 + .../streams/tests/StaticMemberTestClient.java | 84 ++++++++++++++++++++++ .../streams/tests/StreamsNamedRepartitionTest.java | 2 - .../{streams_property.py => consumer_property.py} | 11 +-- tests/kafkatest/services/streams.py | 22 ++++++ tests/kafkatest/services/streams_property.py | 4 -- .../streams_named_repartition_topic_test.py | 35 ++------- .../tests/streams/streams_optimized_test.py | 21 ++---- ...c_test.py => streams_static_membership_test.py} | 83 ++++++++++----------- .../tests/streams/streams_upgrade_test.py | 24 +++---- .../streams/utils/__init__.py} | 12 +--- tests/kafkatest/tests/streams/utils/util.py | 36 ++++++++++ 14 files changed, 211 insertions(+), 132 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 54678f7..73563fd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -802,6 +802,11 @@ public abstract class AbstractCoordinator implements Closeable { return generation; } + protected synchronized String memberId() { + return generation == null ? JoinGroupRequest.UNKNOWN_MEMBER_ID : + generation.memberId; + } + /** * Check whether given generation id is matching the record within current generation. * Only using in unit tests. diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 5d39da5..64bf17d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -585,7 +585,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // visible for testing void invokeCompletedOffsetCommitCallbacks() { if (asyncCommitFenced.get()) { - throw new FencedInstanceIdException("Get fenced exception for group.instance.id " + groupInstanceId.orElse("unset_instance_id")); + throw new FencedInstanceIdException("Get fenced exception for group.instance.id: " + + groupInstanceId.orElse("unset_instance_id") + ", current member.id is " + memberId()); } while (true) { OffsetCommitCompletion completion = completedOffsetCommits.poll(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index fd7c7a4..230a272 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -227,6 +227,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable return super.rejoinNeededOrPending() || (assignmentSnapshot == null || assignmentSnapshot.failed()) || rejoinRequested; } + @Override public String memberId() { Generation generation = generation(); if (generation != null) diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java new file mode 100644 index 0000000..96ccad4 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StaticMemberTestClient.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.tests; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStream; + +import java.util.Objects; +import java.util.Properties; + +public class StaticMemberTestClient { + + private static String testName = "StaticMemberTestClient"; + + @SuppressWarnings("unchecked") + public static void main(final String[] args) throws Exception { + if (args.length < 1) { + System.err.println(testName + " requires one argument (properties-file) but none provided: "); + } + + System.out.println("StreamsTest instance started"); + + final String propFileName = args[0]; + + final Properties streamsProperties = Utils.loadProps(propFileName); + + final String groupInstanceId = Objects.requireNonNull(streamsProperties.getProperty(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG)); + + System.out.println(testName + " instance started with group.instance.id " + groupInstanceId); + System.out.println("props=" + streamsProperties); + System.out.flush(); + + final StreamsBuilder builder = new StreamsBuilder(); + final String inputTopic = (String) (Objects.requireNonNull(streamsProperties.remove("input.topic"))); + + final KStream dataStream = builder.stream(inputTopic); + dataStream.peek((k, v) -> System.out.println(String.format("PROCESSED key=%s value=%s", k, v))); + + final Properties config = new Properties(); + config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, testName); + config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000); + + config.putAll(streamsProperties); + + final KafkaStreams streams = new KafkaStreams(builder.build(), config); + streams.setStateListener((newState, oldState) -> { + if (oldState == KafkaStreams.State.REBALANCING && newState == KafkaStreams.State.RUNNING) { + System.out.println("REBALANCING -> RUNNING"); + System.out.flush(); + } + }); + + streams.start(); + + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + System.out.println("closing Kafka Streams instance"); + System.out.flush(); + streams.close(); + System.out.println("Static membership test closed"); + System.out.flush(); + } + }); + } +} diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java index 911716f..c408d9f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsNamedRepartitionTest.java @@ -117,7 +117,5 @@ public class StreamsNamedRepartitionTest { System.out.println("NAMED_REPARTITION_TEST Streams Stopped"); System.out.flush(); })); - } - } diff --git a/tests/kafkatest/services/streams_property.py b/tests/kafkatest/services/consumer_property.py similarity index 84% copy from tests/kafkatest/services/streams_property.py copy to tests/kafkatest/services/consumer_property.py index 054ea64..0a9756a 100644 --- a/tests/kafkatest/services/streams_property.py +++ b/tests/kafkatest/services/consumer_property.py @@ -14,13 +14,8 @@ # limitations under the License. """ -Define Streams configuration property names here. +Define Consumer configuration property names here. """ -STATE_DIR = "state.dir" -KAFKA_SERVERS = "bootstrap.servers" -NUM_THREADS = "num.stream.threads" - - - - +GROUP_INSTANCE_ID = "group.instance.id" +SESSION_TIMEOUT_MS = "session.timeout.ms" diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py index 5e2c2e9..70564b9 100644 --- a/tests/kafkatest/services/streams.py +++ b/tests/kafkatest/services/streams.py @@ -16,6 +16,7 @@ import os.path import signal import streams_property +import consumer_property from ducktape.services.service import Service from ducktape.utils.util import wait_until from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin @@ -534,3 +535,24 @@ class StreamsNamedRepartitionTopicService(StreamsTestBaseService): cfg = KafkaConfig(**properties) return cfg.render() + +class StaticMemberTestService(StreamsTestBaseService): + def __init__(self, test_context, kafka, group_instance_id, num_threads): + super(StaticMemberTestService, self).__init__(test_context, + kafka, + "org.apache.kafka.streams.tests.StaticMemberTestClient", + "") + self.INPUT_TOPIC = None + self.GROUP_INSTANCE_ID = group_instance_id + self.NUM_THREADS = num_threads + def prop_file(self): + properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT, + streams_property.KAFKA_SERVERS: self.kafka.bootstrap_servers(), + streams_property.NUM_THREADS: self.NUM_THREADS, + consumer_property.GROUP_INSTANCE_ID: self.GROUP_INSTANCE_ID, + consumer_property.SESSION_TIMEOUT_MS: 60000} + + properties['input.topic'] = self.INPUT_TOPIC + + cfg = KafkaConfig(**properties) + return cfg.render() diff --git a/tests/kafkatest/services/streams_property.py b/tests/kafkatest/services/streams_property.py index 054ea64..99f0ece 100644 --- a/tests/kafkatest/services/streams_property.py +++ b/tests/kafkatest/services/streams_property.py @@ -20,7 +20,3 @@ Define Streams configuration property names here. STATE_DIR = "state.dir" KAFKA_SERVERS = "bootstrap.servers" NUM_THREADS = "num.stream.threads" - - - - diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py index b9894ee..1fcdd5f 100644 --- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py +++ b/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py @@ -13,14 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time from ducktape.tests.test import Test -from ducktape.utils.util import wait_until from kafkatest.services.kafka import KafkaService from kafkatest.services.streams import StreamsNamedRepartitionTopicService from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService - +from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running class StreamsNamedRepartitionTopicTest(Test): """ @@ -32,6 +30,7 @@ class StreamsNamedRepartitionTopicTest(Test): input_topic = 'inputTopic' aggregation_topic = 'aggregationTopic' pattern = 'AGGREGATED' + stopped_message = 'NAMED_REPARTITION_TEST Streams Stopped' def __init__(self, test_context): super(StreamsNamedRepartitionTopicTest, self).__init__(test_context) @@ -66,43 +65,25 @@ class StreamsNamedRepartitionTopicTest(Test): for processor in processors: processor.CLEAN_NODE_ENABLED = False self.set_topics(processor) - self.verify_running(processor, 'REBALANCING -> RUNNING') + verify_running(processor, 'REBALANCING -> RUNNING') self.verify_processing(processors) # do rolling upgrade for processor in processors: - self.verify_stopped(processor) + verify_stopped(processor, self.stopped_message) # will tell app to add operations before repartition topic processor.ADD_ADDITIONAL_OPS = 'true' - self.verify_running(processor, 'UPDATED Topology') + verify_running(processor, 'UPDATED Topology') self.verify_processing(processors) - self.stop_processors(processors) + stop_processors(processors, self.stopped_message) self.producer.stop() self.kafka.stop() self.zookeeper.stop() - @staticmethod - def verify_running(processor, message): - node = processor.node - with node.account.monitor_log(processor.STDOUT_FILE) as monitor: - processor.start() - monitor.wait_until(message, - timeout_sec=60, - err_msg="Never saw '%s' message " % message + str(processor.node.account)) - - @staticmethod - def verify_stopped(processor): - node = processor.node - with node.account.monitor_log(processor.STDOUT_FILE) as monitor: - processor.stop() - monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped', - timeout_sec=60, - err_msg="'NAMED_REPARTITION_TEST Streams Stopped' message" + str(processor.node.account)) - def verify_processing(self, processors): for processor in processors: with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: @@ -110,10 +91,6 @@ class StreamsNamedRepartitionTopicTest(Test): timeout_sec=60, err_msg="Never saw processing of %s " % self.pattern + str(processor.node.account)) - def stop_processors(self, processors): - for processor in processors: - self.verify_stopped(processor) - def set_topics(self, processor): processor.INPUT_TOPIC = self.input_topic processor.AGGREGATION_TOPIC = self.aggregation_topic diff --git a/tests/kafkatest/tests/streams/streams_optimized_test.py b/tests/kafkatest/tests/streams/streams_optimized_test.py index 31efc0d..ecd84c2 100644 --- a/tests/kafkatest/tests/streams/streams_optimized_test.py +++ b/tests/kafkatest/tests/streams/streams_optimized_test.py @@ -15,12 +15,11 @@ import time from ducktape.tests.test import Test -from ducktape.utils.util import wait_until from kafkatest.services.kafka import KafkaService from kafkatest.services.streams import StreamsOptimizedUpgradeTestService from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService - +from kafkatest.tests.streams.utils import stop_processors class StreamsOptimizedTest(Test): """ @@ -33,6 +32,7 @@ class StreamsOptimizedTest(Test): reduce_topic = 'reduceTopic' join_topic = 'joinTopic' operation_pattern = 'AGGREGATED\|REDUCED\|JOINED' + stopped_message = 'OPTIMIZE_TEST Streams Stopped' def __init__(self, test_context): super(StreamsOptimizedTest, self).__init__(test_context) @@ -75,7 +75,7 @@ class StreamsOptimizedTest(Test): self.verify_processing(processors, verify_individual_operations=False) - self.stop_processors(processors) + stop_processors(processors, self.stopped_message) # start again with topology optimized for processor in processors: @@ -84,7 +84,7 @@ class StreamsOptimizedTest(Test): self.verify_processing(processors, verify_individual_operations=True) - self.stop_processors(processors) + stop_processors(processors, self.stopped_message) self.producer.stop() self.kafka.stop() @@ -100,15 +100,6 @@ class StreamsOptimizedTest(Test): err_msg="Never saw 'REBALANCING -> RUNNING with REPARTITION TOPIC COUNT=%s' message " % repartition_topic_count + str(processor.node.account)) - @staticmethod - def verify_stopped(processor): - node = processor.node - with node.account.monitor_log(processor.STDOUT_FILE) as monitor: - processor.stop() - monitor.wait_until('OPTIMIZE_TEST Streams Stopped', - timeout_sec=60, - err_msg="'OPTIMIZE_TEST Streams Stopped' message" + str(processor.node.account)) - def verify_processing(self, processors, verify_individual_operations): for processor in processors: if not self.all_source_subtopology_tasks(processor): @@ -139,10 +130,6 @@ class StreamsOptimizedTest(Test): return False - def stop_processors(self, processors): - for processor in processors: - self.verify_stopped(processor) - def set_topics(self, processor): processor.INPUT_TOPIC = self.input_topic processor.AGGREGATION_TOPIC = self.aggregation_topic diff --git a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py b/tests/kafkatest/tests/streams/streams_static_membership_test.py similarity index 53% copy from tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py copy to tests/kafkatest/tests/streams/streams_static_membership_test.py index b9894ee..a466ea8 100644 --- a/tests/kafkatest/tests/streams/streams_named_repartition_topic_test.py +++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py @@ -13,31 +13,28 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time from ducktape.tests.test import Test -from ducktape.utils.util import wait_until from kafkatest.services.kafka import KafkaService -from kafkatest.services.streams import StreamsNamedRepartitionTopicService +from kafkatest.services.streams import StaticMemberTestService from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running, extract_generation_from_logs - -class StreamsNamedRepartitionTopicTest(Test): +class StreamsStaticMembershipTest(Test): """ - Tests using a named repartition topic by starting - application then doing a rolling upgrade with added - operations and the application still runs + Tests using static membership when broker points to minimum supported + version (2.3) or higher. """ input_topic = 'inputTopic' - aggregation_topic = 'aggregationTopic' - pattern = 'AGGREGATED' + pattern = 'PROCESSED' + running_message = 'REBALANCING -> RUNNING' + stopped_message = 'Static membership test closed' def __init__(self, test_context): - super(StreamsNamedRepartitionTopicTest, self).__init__(test_context) + super(StreamsStaticMembershipTest, self).__init__(test_context) self.topics = { - self.input_topic: {'partitions': 6}, - self.aggregation_topic: {'partitions': 6} + self.input_topic: {'partitions': 18}, } self.zookeeper = ZookeeperService(self.test_context, num_nodes=1) @@ -51,13 +48,14 @@ class StreamsNamedRepartitionTopicTest(Test): throughput=1000, acks=1) - def test_upgrade_topology_with_named_repartition_topic(self): + def test_rolling_bounces_will_not_trigger_rebalance_under_static_membership(self): self.zookeeper.start() self.kafka.start() - processor1 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka) - processor2 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka) - processor3 = StreamsNamedRepartitionTopicService(self.test_context, self.kafka) + numThreads = 3 + processor1 = StaticMemberTestService(self.test_context, self.kafka, "consumer-A", numThreads) + processor2 = StaticMemberTestService(self.test_context, self.kafka, "consumer-B", numThreads) + processor3 = StaticMemberTestService(self.test_context, self.kafka, "consumer-C", numThreads) processors = [processor1, processor2, processor3] @@ -66,43 +64,39 @@ class StreamsNamedRepartitionTopicTest(Test): for processor in processors: processor.CLEAN_NODE_ENABLED = False self.set_topics(processor) - self.verify_running(processor, 'REBALANCING -> RUNNING') + verify_running(processor, self.running_message) self.verify_processing(processors) - # do rolling upgrade + # do several rolling bounces + num_bounces = 3 + for i in range(0, num_bounces): + for processor in processors: + verify_stopped(processor, self.stopped_message) + verify_running(processor, self.running_message) + + stable_generation = -1 for processor in processors: - self.verify_stopped(processor) - # will tell app to add operations before repartition topic - processor.ADD_ADDITIONAL_OPS = 'true' - self.verify_running(processor, 'UPDATED Topology') + generations = extract_generation_from_logs(processor) + num_bounce_generations = num_bounces * numThreads + assert num_bounce_generations <= len(generations), \ + "Smaller than minimum expected %d generation messages, actual %d" % (num_bounce_generations, len(generations)) + + for generation in generations[-num_bounce_generations:]: + generation = int(generation) + if stable_generation == -1: + stable_generation = generation + assert stable_generation == generation, \ + "Stream rolling bounce have caused unexpected generation bump %d" % generation self.verify_processing(processors) - self.stop_processors(processors) + stop_processors(processors, self.stopped_message) self.producer.stop() self.kafka.stop() self.zookeeper.stop() - @staticmethod - def verify_running(processor, message): - node = processor.node - with node.account.monitor_log(processor.STDOUT_FILE) as monitor: - processor.start() - monitor.wait_until(message, - timeout_sec=60, - err_msg="Never saw '%s' message " % message + str(processor.node.account)) - - @staticmethod - def verify_stopped(processor): - node = processor.node - with node.account.monitor_log(processor.STDOUT_FILE) as monitor: - processor.stop() - monitor.wait_until('NAMED_REPARTITION_TEST Streams Stopped', - timeout_sec=60, - err_msg="'NAMED_REPARTITION_TEST Streams Stopped' message" + str(processor.node.account)) - def verify_processing(self, processors): for processor in processors: with processor.node.account.monitor_log(processor.STDOUT_FILE) as monitor: @@ -110,10 +104,5 @@ class StreamsNamedRepartitionTopicTest(Test): timeout_sec=60, err_msg="Never saw processing of %s " % self.pattern + str(processor.node.account)) - def stop_processors(self, processors): - for processor in processors: - self.verify_stopped(processor) - def set_topics(self, processor): processor.INPUT_TOPIC = self.input_topic - processor.AGGREGATION_TOPIC = self.aggregation_topic diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index 37ab770..6a6a8bc 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -23,6 +23,7 @@ from kafkatest.services.kafka import KafkaService from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \ StreamsUpgradeTestJobRunnerService from kafkatest.services.zookeeper import ZookeeperService +from kafkatest.tests.streams.utils import extract_generation_from_logs from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ LATEST_2_0, LATEST_2_1, LATEST_2_2, DEV_BRANCH, DEV_VERSION, KafkaVersion @@ -45,7 +46,7 @@ anyone can verify that by calling curl https://s3-us-west-2.amazonaws.com/kafka-packages/kafka_$scala_version-$version.tgz to download the jar and if it is not uploaded yet, ping the dev@kafka mailing list to request it being uploaded. -This test needs to get updated, but this requires several steps +This test needs to get updated, but this requires several steps, which are outlined here: 1. Update all relevant versions in tests/kafkatest/version.py this will include adding a new version for the new @@ -57,17 +58,17 @@ which are outlined here: during the system test run. 3. Update the vagrant/bash.sh file to include all new versions, including the newly released version - and all point releases for existing releases. You only need to list the latest version in + and all point releases for existing releases. You only need to list the latest version in this file. 4. Then update all relevant versions in the tests/docker/Dockerfile -5. Add a new "upgrade-system-tests-XXXX module under streams. You can probably just copy the - latest system test module from the last release. Just make sure to update the systout print - statement in StreamsUpgradeTest to the version for the release. After you add the new module +5. Add a new upgrade-system-tests-XXXX module under streams. You can probably just copy the + latest system test module from the last release. Just make sure to update the systout print + statement in StreamsUpgradeTest to the version for the release. After you add the new module you'll need to update settings.gradle file to include the name of the module you just created - for gradle to recognize the newly added module - + for gradle to recognize the newly added module. + 6. Then you'll need to update any version changes in gradle/dependencies.gradle """ @@ -598,9 +599,9 @@ class StreamsUpgradeTest(Test): retries = 0 while retries < 10: - processor_found = self.extract_generation_from_logs(processor) - first_other_processor_found = self.extract_generation_from_logs(first_other_processor) - second_other_processor_found = self.extract_generation_from_logs(second_other_processor) + processor_found = extract_generation_from_logs(processor) + first_other_processor_found = extract_generation_from_logs(first_other_processor) + second_other_processor_found = extract_generation_from_logs(second_other_processor) if len(processor_found) > 0 and len(first_other_processor_found) > 0 and len(second_other_processor_found) > 0: self.logger.info("processor: " + str(processor_found)) @@ -632,9 +633,6 @@ class StreamsUpgradeTest(Test): return current_generation - def extract_generation_from_logs(self, processor): - return list(processor.node.account.ssh_capture("grep \"Successfully joined group with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == \"generation\") beginning=i+1; if($i== \"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i }; for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % processor.LOG_FILE, allow_fail=True)) - def extract_highest_generation(self, found_generations): return int(found_generations[-1]) diff --git a/tests/kafkatest/services/streams_property.py b/tests/kafkatest/tests/streams/utils/__init__.py similarity index 83% copy from tests/kafkatest/services/streams_property.py copy to tests/kafkatest/tests/streams/utils/__init__.py index 054ea64..6d2957f 100644 --- a/tests/kafkatest/services/streams_property.py +++ b/tests/kafkatest/tests/streams/utils/__init__.py @@ -13,14 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -""" -Define Streams configuration property names here. -""" - -STATE_DIR = "state.dir" -KAFKA_SERVERS = "bootstrap.servers" -NUM_THREADS = "num.stream.threads" - - - - +from util import verify_running, verify_stopped, stop_processors, extract_generation_from_logs diff --git a/tests/kafkatest/tests/streams/utils/util.py b/tests/kafkatest/tests/streams/utils/util.py new file mode 100644 index 0000000..683b199 --- /dev/null +++ b/tests/kafkatest/tests/streams/utils/util.py @@ -0,0 +1,36 @@ +# Copyright 2015 Confluent Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +def verify_running(processor, message): + node = processor.node + with node.account.monitor_log(processor.STDOUT_FILE) as monitor: + processor.start() + monitor.wait_until(message, + timeout_sec=60, + err_msg="Never saw '%s' message " % message + str(processor.node.account)) + +def verify_stopped(processor, message): + node = processor.node + with node.account.monitor_log(processor.STDOUT_FILE) as monitor: + processor.stop() + monitor.wait_until(message, + timeout_sec=60, + err_msg="'%s' message " % message + str(processor.node.account)) + +def stop_processors(processors, stopped_message): + for processor in processors: + verify_stopped(processor, stopped_message) + +def extract_generation_from_logs(processor): + return list(processor.node.account.ssh_capture("grep \"Successfully joined group with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == \"generation\") beginning=i+1; if($i== \"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i }; for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % processor.LOG_FILE, allow_fail=True))