This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ec3694e83d5 KAFKA-19939 StreamsConfig should log WARN by default 
(#21682)
ec3694e83d5 is described below

commit ec3694e83d512eea929ffac2cb5e40ccf6d4b110
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 9 06:39:58 2026 -0700

    KAFKA-19939 StreamsConfig should log WARN by default (#21682)
    
    For backward compatibility, we keep the processing exception handler
    disabled on the global thread by default. StreamsConfig should WARN
    about it, to encourage users to enable it.
    
    Reviewers: Lucas Brutschy <[email protected]>, Arpit goyal
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |  6 +++++
 .../apache/kafka/streams/StreamsConfigTest.java    | 28 ++++++++++++++++++++++
 2 files changed, 34 insertions(+)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index d4767aa184b..f2f402034c9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1536,6 +1536,12 @@ public class StreamsConfig extends AbstractConfig {
         
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
         verifyClientTelemetryConfigs();
         verifyStreamsProtocolCompatibility(doLog);
+        if 
(!getBoolean(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG)) 
{
+            log.warn("Processing exception handler is not enabled for the 
GlobalThread. " +
+                "It's recommended to set `" + 
StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG + "` to true 
to enable it. " +
+                "Enabling the processing exception handler for global 
state/KTable processing now, ensures future backward compatibility. " +
+                "The processing exception handler will get enabled by default 
with Apache Kafka 5.0 release.");
+        }
     }
 
     private void verifyStreamsProtocolCompatibility(final boolean doLog) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index d58909094cf..f33aa9bd767 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1786,11 +1786,39 @@ public class StreamsConfigTest {
             "Please set group.protocol=classic or remove group.instance.id 
from the configuration."));
     }
 
+    @Test
     public void shouldSetDefaultDeadLetterQueue() {
         final StreamsConfig config = new StreamsConfig(props);
         
assertNull(config.getString(StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
     }
 
+    @Test
+    public void 
shouldLogWarningWhenProcessingExceptionHandlerIsNotEnabledOnGlobalThread() {
+        try (LogCaptureAppender streamsConfigLogs = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            streamsConfigLogs.setClassLogger(StreamsConfig.class, Level.WARN);
+            streamsConfig = new StreamsConfig(props);
+
+            assertEquals(1, streamsConfigLogs.getMessages().size());
+            assertTrue(streamsConfigLogs
+                .getMessages(Level.WARN.name())
+                .get(0)
+                .startsWith("Processing exception handler is not enabled for 
the GlobalThread.")
+            );
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    @Test
+    public void 
shouldNotLogWarningWhenProcessingExceptionHandlerIsEnabledOnGlobalThread() {
+        
props.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG, 
true);
+        try (LogCaptureAppender streamsConfigLogs = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            streamsConfigLogs.setClassLogger(StreamsConfig.class, Level.WARN);
+            streamsConfig = new StreamsConfig(props);
+
+            assertEquals(0, streamsConfigLogs.getMessages().size());
+        }
+    }
+
     static class MisconfiguredSerde implements Serde<Object> {
         @Override
         public void configure(final Map<String, ?>  configs, final boolean 
isKey) {

Reply via email to