chirag-wadhwa5 commented on code in PR #18209:
URL: https://github.com/apache/kafka/pull/18209#discussion_r1919795494


##########
tests/kafkatest/services/verifiable_share_consumer.py:
##########
@@ -0,0 +1,337 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.kafka import TopicPartition
+from kafkatest.services.verifiable_client import VerifiableClientMixin
+from kafkatest.version import DEV_BRANCH
+
+class ConsumerState:
+    Started = 1
+    Dead = 2
+
+class ShareConsumerEventHandler(object):
+
+    def __init__(self, node, idx, state=ConsumerState.Dead):
+        self.node = node
+        self.idx = idx
+        self.total_consumed = 0
+        self.total_acknowledged = 0
+        self.total_acknowledged_failed = 0
+        self.consumed_per_partition = {}
+        self.acknowledged_per_partition = {}
+        self.acknowledged_per_partition_failed = {}
+        self.state = state
+
+    def handle_shutdown_complete(self, node=None, logger=None):
+        self.state = ConsumerState.Dead
+        if node is not None and logger is not None:
+            logger.debug("Shut down %s" % node.account.hostname)
+
+    def handle_startup_complete(self, node, logger):
+        self.state = ConsumerState.Started
+        logger.debug("Started %s" % node.account.hostname)
+
+    def handle_offsets_acknowledged(self, event, node, logger):
+        if event["success"]:
+            self.total_acknowledged += event["count"]
+            for share_partition_data in event["partitions"]:
+                topic_partition = 
TopicPartition(share_partition_data["topic"], share_partition_data["partition"])
+                self.acknowledged_per_partition[topic_partition] = 
self.acknowledged_per_partition.get(topic_partition, 0) + 
share_partition_data["count"]
+            logger.debug("Offsets acknowledged for %s" % 
(node.account.hostname))
+        else:
+            self.total_acknowledged_failed += event["count"]
+            for share_partition_data in event["partitions"]:
+                topic_partition = 
TopicPartition(share_partition_data["topic"], share_partition_data["partition"])
+                self.acknowledged_per_partition_failed[topic_partition] = 
self.acknowledged_per_partition_failed.get(topic_partition, 0) + 
share_partition_data["count"]
+            logger.debug("Offsets acknowledged for %s" % 
(node.account.hostname))
+            logger.debug("Offset acknowledgement failed for: %s" % 
(node.account.hostname))
+
+    def handle_records_consumed(self, event, node, logger):
+        self.total_consumed += event["count"]
+        for share_partition_data in event["partitions"]:
+            topic_partition = TopicPartition(share_partition_data["topic"], 
share_partition_data["partition"])
+            self.consumed_per_partition[topic_partition] = 
self.consumed_per_partition.get(topic_partition, 0) + 
share_partition_data["count"]
+        logger.debug("Offsets consumed for %s" % (node.account.hostname))
+
+
+    def handle_kill_process(self, clean_shutdown):
+        # if the shutdown was clean, then we expect the explicit
+        # shutdown event from the consumer
+        if not clean_shutdown:
+            self.handle_shutdown_complete()
+
+class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, 
BackgroundThreadService):
+    """This service wraps org.apache.kafka.tools.VerifiableShareConsumer for 
use in
+    system testing.
+
+    NOTE: this class should be treated as a PUBLIC API. Downstream users use
+    this service both directly and through class extension, so care must be
+    taken to ensure compatibility.
+    """
+
+    PERSISTENT_ROOT = "/mnt/verifiable_share_consumer"
+    STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, 
"verifiable_share_consumer.stdout")
+    STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, 
"verifiable_share_consumer.stderr")
+    LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs")
+    LOG_FILE = os.path.join(LOG_DIR, "verifiable_share_consumer.log")
+    LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
+    CONFIG_FILE = os.path.join(PERSISTENT_ROOT, 
"verifiable_share_consumer.properties")
+
+    logs = {
+            "verifiable_share_consumer_stdout": {
+                "path": STDOUT_CAPTURE,
+                "collect_default": False},
+            "verifiable_share_consumer_stderr": {
+                "path": STDERR_CAPTURE,
+                "collect_default": False},
+            "verifiable_share_consumer_log": {
+                "path": LOG_FILE,
+                "collect_default": True}
+            }
+
+    def __init__(self, context, num_nodes, kafka, topic, group_id,
+                 max_messages=-1, acknowledgement_mode="auto", 
offset_reset_strategy="",
+                 version=DEV_BRANCH, stop_timeout_sec=60, log_level="INFO", 
jaas_override_variables=None,
+                 on_record_consumed=None):
+        """
+        :param jaas_override_variables: A dict of variables to be used in the 
jaas.conf template file
+        """
+        super(VerifiableShareConsumer, self).__init__(context, num_nodes)
+        self.log_level = log_level
+        self.kafka = kafka
+        self.topic = topic
+        self.group_id = group_id
+        self.offset_reset_strategy = offset_reset_strategy
+        self.max_messages = max_messages
+        self.acknowledgement_mode = acknowledgement_mode
+        self.prop_file = ""
+        self.stop_timeout_sec = stop_timeout_sec
+        self.on_record_consumed = on_record_consumed
+
+        self.event_handlers = {}
+        self.jaas_override_variables = jaas_override_variables or {}
+
+        self.total_records_consumed = 0
+        self.total_records_acknowledged = 0
+        self.total_records_acknowledged_failed = 0
+        self.consumed_records_offsets = set()
+        # self.consumed_more_than_once = []
+
+        self.acknowledged_records_offsets = set()
+        # self.acknowledged_more_than_once = []
+        self.is_offset_reset_strategy_set = False
+
+        for node in self.nodes:
+            node.version = version
+
+    def java_class_name(self):
+        return "VerifiableShareConsumer"
+
+    def create_event_handler(self, idx, node):
+        return ShareConsumerEventHandler(node, idx)
+
+    def _worker(self, idx, node):
+        with self.lock:
+            self.event_handlers[node] = self.create_event_handler(idx, node)
+            handler = self.event_handlers[node]
+
+        node.account.ssh("mkdir -p %s" % 
VerifiableShareConsumer.PERSISTENT_ROOT, allow_fail=False)
+
+        # Create and upload log properties
+        log_config = self.render('tools_log4j.properties', 
log_file=VerifiableShareConsumer.LOG_FILE)

Review Comment:
   Hi, I have made the required changes in the latest commit, thanks !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to