mjsax commented on code in PR #19394:
URL: https://github.com/apache/kafka/pull/19394#discussion_r2043211983
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -869,8 +870,10 @@ public void setStreamsUncaughtExceptionHandler(final
BiConsumer<Throwable, Boole
public void maybeSendShutdown() {
if (assignmentErrorCode.get() ==
AssignorError.SHUTDOWN_REQUESTED.code()) {
- log.warn("Detected that shutdown was requested. " +
+ if (shutDownRequested.compareAndSet(false, true)) {
+ log.warn("Detected that shutdown was requested. " +
"All clients in this app will now begin to shutdown");
+ }
mainConsumer.enforceRebalance("Shutdown requested");
Review Comment:
Thanks for the PR -- given that we re-send this signal, I am wondering if we
should keep logging the message on a regular basis, eg, every 10 seconds or
something like this? Instead of a `AtomicBoolean` we could use an `AtomicLong`
(initialize as zero) and keep logging the WARN whenever 10 seconds passed, and
update the `AtomicLong` after we logged with the current timestamp?
Thoughts?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]