This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 0aaca79  KAFKA-10505: Fix parsing of generation log string. (#9312)
0aaca79 is described below

commit 0aaca796cf8222475f5573e8ca077a6f6367e8c0
Author: Nikolay <nizhi...@apache.org>
AuthorDate: Thu Sep 24 00:24:02 2020 +0300

    KAFKA-10505: Fix parsing of generation log string. (#9312)
    
    Reviewers: A. Sophie Blee-Goldman <sop...@confluent.io>, Matthias J. Sax 
<mj...@apache.org>, Guozhang Wang <wangg...@gmail.com>
---
 tests/kafkatest/tests/streams/streams_static_membership_test.py | 4 ++--
 tests/kafkatest/tests/streams/streams_upgrade_test.py           | 4 ++--
 tests/kafkatest/tests/streams/utils/__init__.py                 | 2 +-
 tests/kafkatest/tests/streams/utils/util.py                     | 9 +++++++++
 4 files changed, 14 insertions(+), 5 deletions(-)

diff --git a/tests/kafkatest/tests/streams/streams_static_membership_test.py 
b/tests/kafkatest/tests/streams/streams_static_membership_test.py
index a466ea8..e6072f4 100644
--- a/tests/kafkatest/tests/streams/streams_static_membership_test.py
+++ b/tests/kafkatest/tests/streams/streams_static_membership_test.py
@@ -18,7 +18,7 @@ from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StaticMemberTestService
 from kafkatest.services.verifiable_producer import VerifiableProducer
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.tests.streams.utils import verify_stopped, stop_processors, 
verify_running, extract_generation_from_logs
+from kafkatest.tests.streams.utils import verify_stopped, stop_processors, 
verify_running, extract_generation_from_logs, extract_generation_id
 
 class StreamsStaticMembershipTest(Test):
     """
@@ -83,7 +83,7 @@ class StreamsStaticMembershipTest(Test):
                 "Smaller than minimum expected %d generation messages, actual 
%d" % (num_bounce_generations, len(generations))
 
             for generation in generations[-num_bounce_generations:]:
-                generation = int(generation)
+                generation = extract_generation_id(generation)
                 if stable_generation == -1:
                     stable_generation = generation
                 assert stable_generation == generation, \
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index e064bbe..2065a66 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -23,7 +23,7 @@ from kafkatest.services.kafka import KafkaService
 from kafkatest.services.streams import StreamsSmokeTestDriverService, 
StreamsSmokeTestJobRunnerService, \
     StreamsUpgradeTestJobRunnerService
 from kafkatest.services.zookeeper import ZookeeperService
-from kafkatest.tests.streams.utils import extract_generation_from_logs
+from kafkatest.tests.streams.utils import extract_generation_from_logs, 
extract_generation_id
 from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
     LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
DEV_BRANCH, DEV_VERSION, KafkaVersion
 
@@ -534,7 +534,7 @@ class StreamsUpgradeTest(Test):
         return current_generation
 
     def extract_highest_generation(self, found_generations):
-        return int(found_generations[-1])
+        return extract_generation_id(found_generations[-1])
 
     def verify_metadata_no_upgraded_yet(self):
         for p in self.processors:
diff --git a/tests/kafkatest/tests/streams/utils/__init__.py 
b/tests/kafkatest/tests/streams/utils/__init__.py
index 6d2957f..ed36ca2 100644
--- a/tests/kafkatest/tests/streams/utils/__init__.py
+++ b/tests/kafkatest/tests/streams/utils/__init__.py
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from util import verify_running, verify_stopped, stop_processors, 
extract_generation_from_logs
+from util import verify_running, verify_stopped, stop_processors, 
extract_generation_from_logs, extract_generation_id
diff --git a/tests/kafkatest/tests/streams/utils/util.py 
b/tests/kafkatest/tests/streams/utils/util.py
index 683b199..7bec20c 100644
--- a/tests/kafkatest/tests/streams/utils/util.py
+++ b/tests/kafkatest/tests/streams/utils/util.py
@@ -11,6 +11,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import re
+
 
 def verify_running(processor, message):
     node = processor.node
@@ -34,3 +36,10 @@ def stop_processors(processors, stopped_message):
 
 def extract_generation_from_logs(processor):
     return list(processor.node.account.ssh_capture("grep \"Successfully joined 
group with generation\" %s| awk \'{for(i=1;i<=NF;i++) {if ($i == 
\"generation\") beginning=i+1; if($i== 
\"(org.apache.kafka.clients.consumer.internals.AbstractCoordinator)\") ending=i 
}; for (j=beginning;j<ending;j++) printf $j; printf \"\\n\"}\'" % 
processor.LOG_FILE, allow_fail=True))
+
+def extract_generation_id(generation):
+    # Generation string looks like
+    # 
"Generation{generationId=5,memberId='consumer-A-3-72d7be15-bcdd-4032-b247-784e648d4dd8',protocol='stream'}
 "
+    # Extracting generationId from it.
+    m = re.search(r'Generation{generationId=(\d+),.*', generation)
+    return int(m.group(1))

Reply via email to