This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new 8a515da2c82 KAFKA-19054: StreamThread exception handling with 
SHUTDOWN_APPLICATION may trigger a tight loop with MANY logs (#19394)
8a515da2c82 is described below

commit 8a515da2c824e6e10c00f938aa9d941076e9cd82
Author: Hong-Yi Chen <[email protected]>
AuthorDate: Thu Apr 17 11:35:43 2025 +0800

    KAFKA-19054: StreamThread exception handling with SHUTDOWN_APPLICATION may 
trigger a tight loop with MANY logs (#19394)
    
    Under the `SHUTDOWN_APPLICATION` configuration in Kafka Streams, a tight
    loop in the shutdown process can flood logs with repeated messages. This
    PR introduces a check to ensure that the shutdown log is emitted only
    once every 10 seconds, thereby preventing log flooding.
    
    Reviewers: PoAn Yang <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../StreamsUncaughtExceptionHandlerIntegrationTest.java   | 15 ++++++++++++---
 .../kafka/streams/processor/internals/StreamThread.java   | 11 +++++++++--
 2 files changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 91ae7748f01..b6da54c11b3 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -23,6 +23,8 @@ import 
org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -39,10 +41,12 @@ import org.apache.kafka.streams.kstream.Named;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.TestUtils;
 
+import org.apache.logging.log4j.Level;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
@@ -334,11 +338,14 @@ public class 
StreamsUncaughtExceptionHandlerIntegrationTest {
         properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
 
         final Topology topology = builder.build();
-
-        try (final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, 
properties);
-             final KafkaStreams kafkaStreams2 = new KafkaStreams(topology, 
properties)) {
+        final MockTime time = new MockTime(0L);
+        
+        try (final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, 
properties, time);
+             final KafkaStreams kafkaStreams2 = new KafkaStreams(topology, 
properties, time);
+             final LogCaptureAppender logCaptureAppender = 
LogCaptureAppender.createAndRegister()) {
             kafkaStreams1.setUncaughtExceptionHandler(exception -> 
SHUTDOWN_APPLICATION);
             kafkaStreams2.setUncaughtExceptionHandler(exception -> 
SHUTDOWN_APPLICATION);
+            logCaptureAppender.setClassLogger(StreamThread.class, Level.WARN);
 
             startApplicationAndWaitUntilRunning(asList(kafkaStreams1, 
kafkaStreams2));
 
@@ -346,6 +353,8 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
             waitForApplicationState(asList(kafkaStreams1, kafkaStreams2), 
KafkaStreams.State.ERROR, DEFAULT_DURATION);
 
             assertThat(processorValueCollector.size(), equalTo(1));
+            assertThat("Shutdown warning log message should be exported 
exactly once",
+                    logCaptureAppender.getMessages("WARN").stream().filter(msg 
-> msg.contains("Detected that shutdown was requested")).count(), equalTo(1L));
         }
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f505fab1554..2a83f0b6123 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -346,6 +346,7 @@ public class StreamThread extends Thread implements 
ProcessingThread {
     // These are used to signal from outside the stream thread, but the 
variables themselves are internal to the thread
     private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
     private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
+    private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
     private final boolean eosEnabled;
     private final boolean stateUpdaterEnabled;
     private final boolean processingThreadsEnabled;
@@ -869,8 +870,14 @@ public class StreamThread extends Thread implements 
ProcessingThread {
 
     public void maybeSendShutdown() {
         if (assignmentErrorCode.get() == 
AssignorError.SHUTDOWN_REQUESTED.code()) {
-            log.warn("Detected that shutdown was requested. " +
-                    "All clients in this app will now begin to shutdown");
+            final long now = time.milliseconds();
+            final long lastLogged = lastShutdownWarningTimestamp.get();
+            if (now - lastLogged >= 10_000L) {
+                if (lastShutdownWarningTimestamp.compareAndSet(lastLogged, 
now)) {
+                    log.warn("Detected that shutdown was requested. " +
+                            "All clients in this app will now begin to 
shutdown");
+                }
+            }
             mainConsumer.enforceRebalance("Shutdown requested");
         }
     }

Reply via email to