This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new d595fee2 KAFKA-13231; `TransactionalMessageCopier.start_node` should
wait until the process if fully started (#11264)
d595fee2 is described below
commit d595fee20ef935a3b3fb4417b7268a21995bd8ff
Author: David Jacot <[email protected]>
AuthorDate: Fri Aug 27 08:28:14 2021 +0200
KAFKA-13231; `TransactionalMessageCopier.start_node` should wait until the
process if fully started (#11264)
This patch ensures that the transaction message copier is fully started in
`start_node`. Without this, it is possible that `stop_node` is called before
the process is started which results in not stopping it at all.
Reviewers: Jason Gustafson <[email protected]>
---
tests/kafkatest/services/transactional_message_copier.py | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/tests/kafkatest/services/transactional_message_copier.py
b/tests/kafkatest/services/transactional_message_copier.py
index bea86a6..0717463 100644
--- a/tests/kafkatest/services/transactional_message_copier.py
+++ b/tests/kafkatest/services/transactional_message_copier.py
@@ -159,12 +159,16 @@ class TransactionalMessageCopier(KafkaPathResolverMixin,
BackgroundThreadService
def alive(self, node):
return len(self.pids(node)) > 0
+ def start_node(self, node):
+ BackgroundThreadService.start_node(self, node)
+ wait_until(lambda: self.alive(node), timeout_sec=60, err_msg="Node %s:
Message Copier failed to start" % str(node.account))
+
def kill_node(self, node, clean_shutdown=True):
pids = self.pids(node)
sig = signal.SIGTERM if clean_shutdown else signal.SIGKILL
for pid in pids:
node.account.signal(pid, sig)
- wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60,
err_msg="Message Copier failed to stop")
+ wait_until(lambda: len(self.pids(node)) == 0, timeout_sec=60,
err_msg="Node %s: Message Copier failed to stop" % str(node.account))
def stop_node(self, node, clean_shutdown=True):
self.kill_node(node, clean_shutdown)