This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 2.6 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push: new 0aaca79 KAFKA-10505: Fix parsing of generation log string. (#9312) 0aaca79 is described below commit 0aaca796cf8222475f5573e8ca077a6f6367e8c0 Author: Nikolay <nizhi...@apache.org> AuthorDate: Thu Sep 24 00:24:02 2020 +0300 KAFKA-10505: Fix parsing of generation log string. (#9312) Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Matthias J. Sax <mj...@apache.org>, Guozhang Wang <wangg...@gmail.com> --- tests/kafkatest/tests/streams/streams_static_membership_test.py | 4 ++-- tests/kafkatest/tests/streams/streams_upgrade_test.py | 4 ++-- tests/kafkatest/tests/streams/utils/__init__.py | 2 +- tests/kafkatest/tests/streams/utils/util.py | 9 +++++++++ 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/tests/kafkatest/tests/streams/streams_static_membership_test.py b/tests/kafkatest/tests/streams/streams_static_membership_test.py index a466ea8..e6072f4 100644 --- a/tests/kafkatest/tests/streams/streams_static_membership_test.py +++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py @@ -18,7 +18,7 @@ from kafkatest.services.kafka import KafkaService from kafkatest.services.streams import StaticMemberTestService from kafkatest.services.verifiable_producer import VerifiableProducer from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running, extract_generation_from_logs +from kafkatest.tests.streams.utils import verify_stopped, stop_processors, verify_running, extract_generation_from_logs, extract_generation_id class StreamsStaticMembershipTest(Test): """ @@ -83,7 +83,7 @@ class StreamsStaticMembershipTest(Test): "Smaller than minimum expected %d generation messages, actual %d" % (num_bounce_generations, len(generations)) for generation in generations[-num_bounce_generations:]: - generation = int(generation) + generation = extract_generation_id(generation) if stable_generation == -1: stable_generation = generation assert stable_generation == generation, \ diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py index e064bbe..2065a66 100644 --- a/tests/kafkatest/tests/streams/streams_upgrade_test.py +++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py @@ -23,7 +23,7 @@ from kafkatest.services.kafka import KafkaService from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsSmokeTestJobRunnerService, \ StreamsUpgradeTestJobRunnerService from kafkatest.services.zookeeper import ZookeeperService -from kafkatest.tests.streams.utils import extract_generation_from_logs +from kafkatest.tests.streams.utils import extract_generation_from_logs, extract_generation_id from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \ LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, DEV_BRANCH, DEV_VERSION, KafkaVersion @@ -534,7 +534,7 @@ class StreamsUpgradeTest(Test): return current_generation def extract_highest_generation(self, found_generations): - return int(found_generations[-1]) + return extract_generation_id(found_generations[-1]) def verify_metadata_no_upgraded_yet(self): for p in self.processors: diff --git a/tests/kafkatest/tests/streams/utils/__init__.py b/tests/kafkatest/tests/streams/utils/__init__.py index 6d2957f..ed36ca2 100644 --- a/tests/kafkatest/tests/streams/utils/__init__.py +++ b/tests/kafkatest/tests/streams/utils/__init__.py @@ -13,4 +13,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -from util import verify_running, verify_stopped, stop_processors, extract_generation_from_logs +from util import verify_running, verify_stopped, stop_processors, extract_generation_from_logs, extract_generation_id diff --git a/tests/kafkatest/tests/streams/utils/util.py b/tests/kafkatest/tests/streams/utils/util.py index 683b199..7bec20c 100644 --- a/tests/kafkatest/tests/streams/utils/util.py +++ b/tests/kafkatest/tests/streams/utils/util.py @@ -11,6 +11,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import re + def verify_running(processor, message): node = processor.node @@ -34,3 +36,10 @@ def stop_processors(processors, stopped_message): def extract_generation_from_logs(processor): return list(processor.node.account.ssh_capture("grep \"Successfully joined group with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == \"generation\") beginning=i+1; if($i== \"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i }; for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % processor.LOG_FILE, allow_fail=True)) + +def extract_generation_id(generation): + # Generation string looks like + # "Generation{generationId=5,memberId='consumer-A-3-72d7be15-bcdd-4032-b247-784e648d4dd8',protocol='stream'} " + # Extracting generationId from it. + m = re.search(r'Generation{generationId=(\d+),.*', generation) + return int(m.group(1))