This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.0 by this push:
new 8a515da2c82 KAFKA-19054: StreamThread exception handling with
SHUTDOWN_APPLICATION may trigger a tight loop with MANY logs (#19394)
8a515da2c82 is described below
commit 8a515da2c824e6e10c00f938aa9d941076e9cd82
Author: Hong-Yi Chen <[email protected]>
AuthorDate: Thu Apr 17 11:35:43 2025 +0800
KAFKA-19054: StreamThread exception handling with SHUTDOWN_APPLICATION may
trigger a tight loop with MANY logs (#19394)
Under the `SHUTDOWN_APPLICATION` configuration in Kafka Streams, a tight
loop in the shutdown process can flood logs with repeated messages. This
PR introduces a check to ensure that the shutdown log is emitted only
once every 10 seconds, thereby preventing log flooding.
Reviewers: PoAn Yang <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../StreamsUncaughtExceptionHandlerIntegrationTest.java | 15 ++++++++++++---
.../kafka/streams/processor/internals/StreamThread.java | 11 +++++++++--
2 files changed, 21 insertions(+), 5 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 91ae7748f01..b6da54c11b3 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -23,6 +23,8 @@ import
org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
+import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
@@ -39,10 +41,12 @@ import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.TestUtils;
+import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -334,11 +338,14 @@ public class
StreamsUncaughtExceptionHandlerIntegrationTest {
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
final Topology topology = builder.build();
-
- try (final KafkaStreams kafkaStreams1 = new KafkaStreams(topology,
properties);
- final KafkaStreams kafkaStreams2 = new KafkaStreams(topology,
properties)) {
+ final MockTime time = new MockTime(0L);
+
+ try (final KafkaStreams kafkaStreams1 = new KafkaStreams(topology,
properties, time);
+ final KafkaStreams kafkaStreams2 = new KafkaStreams(topology,
properties, time);
+ final LogCaptureAppender logCaptureAppender =
LogCaptureAppender.createAndRegister()) {
kafkaStreams1.setUncaughtExceptionHandler(exception ->
SHUTDOWN_APPLICATION);
kafkaStreams2.setUncaughtExceptionHandler(exception ->
SHUTDOWN_APPLICATION);
+ logCaptureAppender.setClassLogger(StreamThread.class, Level.WARN);
startApplicationAndWaitUntilRunning(asList(kafkaStreams1,
kafkaStreams2));
@@ -346,6 +353,8 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
waitForApplicationState(asList(kafkaStreams1, kafkaStreams2),
KafkaStreams.State.ERROR, DEFAULT_DURATION);
assertThat(processorValueCollector.size(), equalTo(1));
+ assertThat("Shutdown warning log message should be exported
exactly once",
+ logCaptureAppender.getMessages("WARN").stream().filter(msg
-> msg.contains("Detected that shutdown was requested")).count(), equalTo(1L));
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index f505fab1554..2a83f0b6123 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -346,6 +346,7 @@ public class StreamThread extends Thread implements
ProcessingThread {
// These are used to signal from outside the stream thread, but the
variables themselves are internal to the thread
private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
+ private final AtomicLong lastShutdownWarningTimestamp = new AtomicLong(0L);
private final boolean eosEnabled;
private final boolean stateUpdaterEnabled;
private final boolean processingThreadsEnabled;
@@ -869,8 +870,14 @@ public class StreamThread extends Thread implements
ProcessingThread {
public void maybeSendShutdown() {
if (assignmentErrorCode.get() ==
AssignorError.SHUTDOWN_REQUESTED.code()) {
- log.warn("Detected that shutdown was requested. " +
- "All clients in this app will now begin to shutdown");
+ final long now = time.milliseconds();
+ final long lastLogged = lastShutdownWarningTimestamp.get();
+ if (now - lastLogged >= 10_000L) {
+ if (lastShutdownWarningTimestamp.compareAndSet(lastLogged,
now)) {
+ log.warn("Detected that shutdown was requested. " +
+ "All clients in this app will now begin to
shutdown");
+ }
+ }
mainConsumer.enforceRebalance("Shutdown requested");
}
}