Copilot commented on code in PR #21535:
URL: https://github.com/apache/kafka/pull/21535#discussion_r2873587273


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +316,84 @@ private void reprocessState(final List<TopicPartition> 
topicPartitions,
                             record.headers());
                     globalProcessorContext.setRecordContext(recordContext);
 
-                    try {
-                        if (record.key() != null) {
-                            source.process(new Record(
+                    if (record.key() != null) {
+                        // Deserialization phase
+                        final Record<?, ?> deserializedRecord;
+                        try {
+                            deserializedRecord = new Record<>(
                                 
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
                                 
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
                                 record.timestamp(),
-                                record.headers()));
+                                record.headers());
+                        } catch (final Exception deserializationException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            handleDeserializationFailure(
+                                deserializationExceptionHandler,
+                                globalProcessorContext,
+                                deserializationException,
+                                record,
+                                log,
+                                droppedRecordsSensor(
+                                    Thread.currentThread().getName(),
+                                    globalProcessorContext.taskId().toString(),
+                                    globalProcessorContext.metrics()
+                                ),
+                                null
+                            );
+                            continue; // Skip this record
+                        }
+
+                        // Processing phase
+                        try {
+                            @SuppressWarnings("unchecked")
+                            final Processor<Object, Object, Object, Object> 
typedSource = 
+                                (Processor<Object, Object, Object, Object>) 
source;
+                            @SuppressWarnings("unchecked")
+                            final Record<Object, Object> typedRecord = 
(Record<Object, Object>) deserializedRecord;
+                            typedSource.process(typedRecord);
                             restoreCount++;
                             batchRestoreCount++;
+                        } catch (final Exception processingException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            if (processingExceptionHandler != null) {
+                                final ErrorHandlerContext errorHandlerContext 
= new DefaultErrorHandlerContext(
+                                    globalProcessorContext,
+                                    record.topic(),
+                                    record.partition(),
+                                    record.offset(),
+                                    record.headers(),
+                                    storeName,
+                                    globalProcessorContext.taskId(),
+                                    record.timestamp(),
+                                    record.key(),
+                                    record.value()
+                                );
+                                final ProcessingExceptionHandler.Response 
response = 
+                                    processingExceptionHandler.handleError(
+                                        errorHandlerContext,
+                                        deserializedRecord,
+                                        processingException
+                                    );
+                                
+                                if (response.result() == 
ProcessingExceptionHandler.Result.FAIL) {
+                                    log.error("Processing exception handler 
chose to fail for record at offset {}", record.offset());
+                                    throw processingException;

Review Comment:
   On `Result.FAIL`, this code logs and then rethrows the original 
`processingException`. For consistency with stream-task processing (and to 
preserve processor/store identity in the exception), consider throwing a 
`FailedProcessingException` (or otherwise wrapping with node/store info) when 
the handler chooses FAIL.
   ```suggestion
                                       throw new ProcessorStateException(
                                           String.format(
                                               "Exception caught during global 
store '%s' processing of record at offset %d",
                                               storeName,
                                               record.offset()
                                           ),
                                           processingException
                                       );
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +316,84 @@ private void reprocessState(final List<TopicPartition> 
topicPartitions,
                             record.headers());
                     globalProcessorContext.setRecordContext(recordContext);
 
-                    try {
-                        if (record.key() != null) {
-                            source.process(new Record(
+                    if (record.key() != null) {
+                        // Deserialization phase
+                        final Record<?, ?> deserializedRecord;
+                        try {
+                            deserializedRecord = new Record<>(
                                 
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
                                 
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
                                 record.timestamp(),
-                                record.headers()));
+                                record.headers());
+                        } catch (final Exception deserializationException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            handleDeserializationFailure(
+                                deserializationExceptionHandler,
+                                globalProcessorContext,
+                                deserializationException,
+                                record,
+                                log,
+                                droppedRecordsSensor(
+                                    Thread.currentThread().getName(),
+                                    globalProcessorContext.taskId().toString(),
+                                    globalProcessorContext.metrics()
+                                ),
+                                null
+                            );
+                            continue; // Skip this record
+                        }
+
+                        // Processing phase
+                        try {
+                            @SuppressWarnings("unchecked")
+                            final Processor<Object, Object, Object, Object> 
typedSource = 
+                                (Processor<Object, Object, Object, Object>) 
source;
+                            @SuppressWarnings("unchecked")
+                            final Record<Object, Object> typedRecord = 
(Record<Object, Object>) deserializedRecord;
+                            typedSource.process(typedRecord);
                             restoreCount++;
                             batchRestoreCount++;
+                        } catch (final Exception processingException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            if (processingExceptionHandler != null) {
+                                final ErrorHandlerContext errorHandlerContext 
= new DefaultErrorHandlerContext(
+                                    globalProcessorContext,
+                                    record.topic(),
+                                    record.partition(),
+                                    record.offset(),
+                                    record.headers(),
+                                    storeName,
+                                    globalProcessorContext.taskId(),
+                                    record.timestamp(),
+                                    record.key(),
+                                    record.value()
+                                );
+                                final ProcessingExceptionHandler.Response 
response = 
+                                    processingExceptionHandler.handleError(
+                                        errorHandlerContext,
+                                        deserializedRecord,
+                                        processingException
+                                    );
+                                
+                                if (response.result() == 
ProcessingExceptionHandler.Result.FAIL) {
+                                    log.error("Processing exception handler 
chose to fail for record at offset {}", record.offset());
+                                    throw processingException;
+                                }
+                                // RESUME - log and continue
+                                log.warn("Processing exception handler chose 
to resume for record at offset {}", record.offset(), processingException);
+                                droppedRecordsSensor(

Review Comment:
   The handler response's `deadLetterQueueRecords()` (if any) is currently 
ignored in the global restoration path. This silently drops DLQ output that 
user handlers may rely on. If DLQ is not supported for global processing (no 
producer), please at least log/metric that DLQ records were requested but 
cannot be produced (similar to ProcessorNode’s handling) so users understand 
the behavior.



##########
streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java:
##########
@@ -445,6 +445,7 @@ private void setupGlobalTask(final Time mockWallClockTime,
                 globalProcessorContext,
                 globalStateManager,
                 new LogAndContinueExceptionHandler(),
+                null,
                 mockWallClockTime,
                 streamsConfig.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG)

Review Comment:
   TopologyTestDriver always passes a null ProcessingExceptionHandler into 
GlobalStateUpdateTask, so setting 
`processing.exception.handler.global.enabled=true` in test configs will have no 
effect and test-driver behavior will diverge from runtime (GlobalStreamThread 
now wires the handler when enabled). Please plumb the handler from 
`streamsConfig.processingExceptionHandler()` when the new global-enabled config 
is true, otherwise null for backwards-compatibility.



##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1250,7 +1256,8 @@ public class StreamsConfig extends AbstractConfig {
                     Type.LONG,
                     null,
                     Importance.LOW,
-                    WINDOW_SIZE_MS_DOC);
+                    WINDOW_SIZE_MS_DOC)
+                .define(PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG, 
Type.BOOLEAN, false, Importance.LOW, 
PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_DOC);

Review Comment:
   The new `.define(PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG, ...)` 
line is indented differently than the rest of the fluent `CONFIG` chain, which 
is likely to fail the repository's Checkstyle `Indentation` rule. Please align 
it with the surrounding `.define(...)` calls (and consider wrapping the 
arguments to match the established formatting).
   ```suggestion
               .define(PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,
                       Type.BOOLEAN,
                       false,
                       Importance.LOW,
                       PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_DOC);
   ```



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java:
##########
@@ -342,13 +350,144 @@ public void shouldGetToRunningWithOnlyGlobalTopology() 
throws Exception {
         kafkaStreams.close();
     }
 
+    private void createBuilderWithFailedProcessor() {
+        builder = new StreamsBuilder();
+        builder.addGlobalStore(
+                Stores.keyValueStoreBuilder(
+                        Stores.inMemoryKeyValueStore("test-global-store"),
+                        Serdes.Long(),
+                        Serdes.String()
+                ),
+                globalTableTopic,
+                Consumed.with(Serdes.Long(), Serdes.String()),
+                () -> new ContextualProcessor<Long, String, Void, Void>() {
+                    @Override
+                    public void 
process(org.apache.kafka.streams.processor.api.Record<Long, String> record) {
+                        if (record.key() == 2L) {
+                            throw new RuntimeException("Test processing 
exception");
+                        }
+                    }
+                }
+        );
+    }
+
+    @Test
+    public void 
testProcessingExceptionHandlerContinueEnabledRestorationPhase() throws 
Exception {
+        createBuilderWithFailedProcessor();
+        // enable processing exception handler invoked config
+        TestGlobalProcessingExceptionHandler.shouldResume = true;
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,true);
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+            TestGlobalProcessingExceptionHandler.class);
+
+        produceInitialGlobalTableValues();
+        startStreams();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
+
+        assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+    }
+    @Test
+    public void testProcessingExceptionHandlerFailEnabledRestorationPhase() 
throws Exception {
+        createBuilderWithFailedProcessor();
+        // enable processing exception handler invoked config
+        TestGlobalProcessingExceptionHandler.shouldResume = false;
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,true);
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+                TestGlobalProcessingExceptionHandler.class);
+
+        produceInitialGlobalTableValues();
+        assertThrows(StreamsException.class, () -> {
+            startStreams();
+        });
+        assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+
+    }
+    @Test
+    public void testProcessingExceptionHandlerDisabledRestorationPhase() 
throws Exception {
+        createBuilderWithFailedProcessor();
+        // enable processing exception handler invoked config
+        TestGlobalProcessingExceptionHandler.shouldResume = false;
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,false);
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+                TestGlobalProcessingExceptionHandler.class);
+
+        produceInitialGlobalTableValues();
+        assertThrows(StreamsException.class, () -> {
+            startStreams();
+        });
+        assertFalse(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+
+    }
+    @Test
+    public void testProcessingExceptionHandlerContinueEnabledRunTimePhase() 
throws Exception {
+        createBuilderWithFailedProcessor();
+        // enable processing exception handler invoked config
+        TestGlobalProcessingExceptionHandler.shouldResume = true;
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,true);
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+                TestGlobalProcessingExceptionHandler.class);
+
+        startStreams();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
+        produceInitialGlobalTableValues();
+        
+        TestUtils.waitForCondition(
+            () -> TestGlobalProcessingExceptionHandler.handlerInvoked.get(),
+            Duration.ofSeconds(30).toMillis(),
+            "Handler was not invoked for key 2L"
+        );
+    }
+    @Test
+    public void testProcessingExceptionHandlerFailEnabledRunTimePhase() throws 
Exception {
+        createBuilderWithFailedProcessor();
+        // enable processing exception handler invoked config
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,true);
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+                TestGlobalProcessingExceptionHandler.class);
+
+        startStreams();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
+        produceInitialGlobalTableValues();
+        waitForApplicationState(singletonList(kafkaStreams), State.ERROR, 
Duration.ofSeconds(30));
+        assertTrue(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+    }
+    @Test
+    public void testProcessingExceptionHandlerDisabledRunTimePhase() throws 
Exception {
+        createBuilderWithFailedProcessor();
+        // enable processing exception handler invoked config
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_GLOBAL_ENABLED_CONFIG,false);
+        
streamsConfiguration.put(StreamsConfig.PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG,
+                TestGlobalProcessingExceptionHandler.class);
+
+        startStreams();
+        waitForApplicationState(singletonList(kafkaStreams), State.RUNNING, 
Duration.ofSeconds(30));
+        produceInitialGlobalTableValues();
+        waitForApplicationState(singletonList(kafkaStreams), State.ERROR, 
Duration.ofSeconds(30));
+        assertFalse(TestGlobalProcessingExceptionHandler.handlerInvoked.get());
+    }
+
+    public static class TestGlobalProcessingExceptionHandler implements 
ProcessingExceptionHandler {
+        static AtomicBoolean handlerInvoked = new AtomicBoolean(false);
+        static boolean shouldResume = false;
+
+        @Override
+        public Response handleError(ErrorHandlerContext context, 
org.apache.kafka.streams.processor.api.Record<?, ?> record, Exception 
exception) {
+            handlerInvoked.set(true);
+            return shouldResume ? Response.resume() : Response.fail();
+        }

Review Comment:
   `TestGlobalProcessingExceptionHandler.handlerInvoked` is static and never 
reset between tests. This makes the new integration tests order-dependent (eg a 
prior test could set it true and a later test’s `assertFalse`/`assertTrue` 
becomes meaningless). Reset `handlerInvoked` (and any other static flags like 
`shouldResume`) in `@BeforeEach`/`@AfterEach` or at the start of each test.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java:
##########
@@ -311,33 +316,84 @@ private void reprocessState(final List<TopicPartition> 
topicPartitions,
                             record.headers());
                     globalProcessorContext.setRecordContext(recordContext);
 
-                    try {
-                        if (record.key() != null) {
-                            source.process(new Record(
+                    if (record.key() != null) {
+                        // Deserialization phase
+                        final Record<?, ?> deserializedRecord;
+                        try {
+                            deserializedRecord = new Record<>(
                                 
reprocessFactory.keyDeserializer().deserialize(record.topic(), record.key()),
                                 
reprocessFactory.valueDeserializer().deserialize(record.topic(), 
record.value()),
                                 record.timestamp(),
-                                record.headers()));
+                                record.headers());
+                        } catch (final Exception deserializationException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            handleDeserializationFailure(
+                                deserializationExceptionHandler,
+                                globalProcessorContext,
+                                deserializationException,
+                                record,
+                                log,
+                                droppedRecordsSensor(
+                                    Thread.currentThread().getName(),
+                                    globalProcessorContext.taskId().toString(),
+                                    globalProcessorContext.metrics()
+                                ),
+                                null
+                            );
+                            continue; // Skip this record
+                        }
+
+                        // Processing phase
+                        try {
+                            @SuppressWarnings("unchecked")
+                            final Processor<Object, Object, Object, Object> 
typedSource = 
+                                (Processor<Object, Object, Object, Object>) 
source;
+                            @SuppressWarnings("unchecked")
+                            final Record<Object, Object> typedRecord = 
(Record<Object, Object>) deserializedRecord;
+                            typedSource.process(typedRecord);
                             restoreCount++;
                             batchRestoreCount++;
+                        } catch (final Exception processingException) {
+                            // while Java distinguishes checked vs unchecked 
exceptions, other languages
+                            // like Scala or Kotlin do not, and thus we need 
to catch `Exception`
+                            // (instead of `RuntimeException`) to work well 
with those languages
+                            if (processingExceptionHandler != null) {
+                                final ErrorHandlerContext errorHandlerContext 
= new DefaultErrorHandlerContext(
+                                    globalProcessorContext,
+                                    record.topic(),
+                                    record.partition(),
+                                    record.offset(),
+                                    record.headers(),
+                                    storeName,
+                                    globalProcessorContext.taskId(),
+                                    record.timestamp(),
+                                    record.key(),
+                                    record.value()
+                                );
+                                final ProcessingExceptionHandler.Response 
response = 
+                                    processingExceptionHandler.handleError(
+                                        errorHandlerContext,
+                                        deserializedRecord,
+                                        processingException
+                                    );

Review Comment:
   The call to `processingExceptionHandler.handleError(...)` isn't guarded 
against the handler throwing or returning null. In other Streams paths (eg 
StreamTask/ProcessorNode), the response is `requireNonNull`-checked and handler 
exceptions are treated as fatal user-code errors. Please apply the same 
defensive handling here to avoid NPEs or inconsistent error propagation during 
global restoration.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java:
##########
@@ -404,6 +405,7 @@ private StateConsumer initialize() {
                     globalProcessorContext,
                     stateMgr,
                     config.deserializationExceptionHandler(),
+                    processingExceptionHandler,

Review Comment:
   `processingExceptionHandler` is instantiated here via 
`config.processingExceptionHandler()`, but `GlobalStateManagerImpl` also 
instantiates its own handler instance. That means restoration-phase errors 
(handled in GlobalStateManagerImpl) and runtime global processing errors 
(handled via GlobalStateUpdateTask/ProcessorNode) will use different handler 
objects, which is surprising if handlers keep state/metrics. Consider 
creating/configuring the handler once and sharing the same instance across both 
components.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -254,15 +254,21 @@ public void process(final Record<KIn, VIn> record) {
 
             final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords 
= response.deadLetterQueueRecords();
             if (!deadLetterQueueRecords.isEmpty()) {
-                final RecordCollector collector = ((RecordCollector.Supplier) 
internalProcessorContext).recordCollector();
-                for (final ProducerRecord<byte[], byte[]> 
deadLetterQueueRecord : deadLetterQueueRecords) {
-                    collector.send(
-                            deadLetterQueueRecord.key(),
-                            deadLetterQueueRecord.value(),
-                            name(),
-                            internalProcessorContext,
-                            deadLetterQueueRecord
-                    );
+                if (!(internalProcessorContext instanceof 
RecordCollector.Supplier)) {
+                    log.warn("Dead letter queue records cannot be sent for 
GlobalKTable processors " +
+                            "(no producer available). DLQ support for 
GlobalKTable will be addressed in a future KIP. " +
+                            "Record details logged: topic={}, headers={}", 
internalProcessorContext.topic(), internalProcessorContext.headers());

Review Comment:
   This warning logs `internalProcessorContext.headers()` directly. Record 
headers can contain user data and can be large, so logging them may leak 
sensitive information and create excessive log volume. Consider logging only 
topic/partition/offset (or the header keys/count) rather than the full headers 
payload.
   ```suggestion
                               "Record details logged: topic={}, 
headerCount={}", internalProcessorContext.topic(), 
internalProcessorContext.headers().toArray().length);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to