This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3db4a781676 HOTFIX: fix compilation error
3db4a781676 is described below

commit 3db4a781676199af0be2b67989ca2204c64788fc
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Mon Jul 29 21:07:49 2024 -0700

    HOTFIX: fix compilation error
---
 .../apache/kafka/streams/processor/internals/ProcessorNode.java   | 2 +-
 .../kafka/streams/processor/internals/RecordCollectorImpl.java    | 8 ++++++--
 2 files changed, 7 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 65eec47cb1d..763edc9a045 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -205,7 +205,7 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
             throw e;
         } catch (final Exception e) {
             final ErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
-                null,
+                null, // only required to pass for 
DeserializationExceptionHandler
                 internalProcessorContext.topic(),
                 internalProcessorContext.partition(),
                 internalProcessorContext.offset(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index 35097153a5f..de4afc2c924 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -301,12 +301,14 @@ public class RecordCollectorImpl implements 
RecordCollector {
 
         try {
             final DefaultErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
+                null, // only required to pass for 
DeserializationExceptionHandler
                 context.recordContext().topic(),
                 context.recordContext().partition(),
                 context.recordContext().offset(),
                 context.recordContext().headers(),
                 processorNodeId,
-                taskId);
+                taskId
+            );
             response = 
productionExceptionHandler.handleSerializationException(errorHandlerContext, 
record, exception, origin);
         } catch (final Exception e) {
             log.error("Fatal when handling serialization exception", e);
@@ -395,12 +397,14 @@ public class RecordCollectorImpl implements 
RecordCollector {
                 sendException.set(new 
TaskCorruptedException(Collections.singleton(taskId)));
             } else {
                 final DefaultErrorHandlerContext errorHandlerContext = new 
DefaultErrorHandlerContext(
+                    null, // only required to pass for 
DeserializationExceptionHandler
                     context.recordContext().topic(),
                     context.recordContext().partition(),
                     context.recordContext().offset(),
                     context.recordContext().headers(),
                     processorNodeId,
-                    taskId);
+                    taskId
+                );
 
                 if (productionExceptionHandler.handle(errorHandlerContext, 
serializedRecord, exception) == ProductionExceptionHandlerResponse.FAIL) {
                     errorMessage += "\nException handler choose to FAIL the 
processing, no more records would be sent.";

Reply via email to