chirag-wadhwa5 commented on code in PR #18209:
URL: https://github.com/apache/kafka/pull/18209#discussion_r1918820386


##########
tests/kafkatest/tests/verifiable_share_consumer_test.py:
##########
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.utils.util import wait_until
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.verifiable_producer import VerifiableProducer
+from kafkatest.services.verifiable_share_consumer import 
VerifiableShareConsumer
+from kafkatest.services.kafka import TopicPartition
+
+class VerifiableShareConsumerTest(KafkaTest):
+    PRODUCER_REQUEST_TIMEOUT_SEC = 30
+
+    def __init__(self, test_context, num_consumers=1, num_producers=0, 
**kwargs):
+        super(VerifiableShareConsumerTest, self).__init__(test_context, 
**kwargs)
+        self.num_consumers = num_consumers
+        self.num_producers = num_producers
+
+    def _all_partitions(self, topic, num_partitions):
+        partitions = set()
+        for i in range(num_partitions):
+            partitions.add(TopicPartition(topic=topic, partition=i))
+        return partitions
+
+    def _partitions(self, assignment):
+        partitions = []
+        for parts in assignment.values():
+            partitions += parts
+        return partitions
+
+    def valid_assignment(self, topic, num_partitions, assignment):
+        all_partitions = self._all_partitions(topic, num_partitions)
+        partitions = self._partitions(assignment)
+        return len(partitions) == num_partitions and set(partitions) == 
all_partitions
+
+    def min_cluster_size(self):
+        """Override this since we're adding services outside of the 
constructor"""
+        return super(VerifiableShareConsumerTest, self).min_cluster_size() + 
self.num_consumers + self.num_producers
+
+    def setup_share_group(self, topic, acknowledgement_mode="auto", 
group_id="test_group_id", offset_reset_strategy="", **kwargs):
+        return VerifiableShareConsumer(self.test_context, self.num_consumers, 
self.kafka,
+                                  topic, group_id, 
acknowledgement_mode=acknowledgement_mode,
+                                  offset_reset_strategy=offset_reset_strategy, 
log_level="TRACE", **kwargs)
+
+    def setup_producer(self, topic, max_messages=-1, throughput=500):
+        return VerifiableProducer(self.test_context, self.num_producers, 
self.kafka, topic,
+                                  max_messages=max_messages, 
throughput=throughput,
+                                  
request_timeout_sec=self.PRODUCER_REQUEST_TIMEOUT_SEC,
+                                  log_level="DEBUG")
+
+    def await_produced_messages(self, producer, min_messages=1000, 
timeout_sec=10):
+        current_acked = producer.num_acked
+        wait_until(lambda: producer.num_acked >= current_acked + min_messages, 
timeout_sec=timeout_sec,
+                   err_msg="Timeout awaiting messages to be produced and 
acked")
+
+    def await_consumed_messages(self, consumer, min_messages=1, 
timeout_sec=10, total=False):
+        current_total = 0
+        if total is False:
+            current_total = consumer.total_consumed()
+        wait_until(lambda: consumer.total_consumed() >= current_total + 
min_messages,
+                   timeout_sec=timeout_sec,
+                   err_msg="Timed out waiting for consumption")
+
+    def await_consumed_messages_by_a_consumer(self, consumer, node, 
min_messages=1, timeout_sec=10, total=False):

Review Comment:
   Thanks for the review. If you seethe usage of 
`await_consumed_messages_by_a_consumer`, it is used in the methods 
`rolling_bounce_share_consumers` and `bounce_all_share_consumers`. 
`await_consumed_messages` waits for consumption of messages, that could be from 
any of the share consumers in the group, while 
`await_consumed_messages_by_a_consumer` provides a specific node and waits for 
that particular consume to consume some messages. So, in the tests where share 
consumers are bounced (restarted), this gives us a guarantee that the consumer 
has started successfully and is consuming messages after the restart.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to