lucasbru commented on code in PR #17942:
URL: https://github.com/apache/kafka/pull/17942#discussion_r2200535462


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -619,6 +619,10 @@ public class StreamsConfig extends AbstractConfig {
         "support \"classic\" or \"streams\". If \"streams\" is specified, then 
the streams rebalance protocol will be " +
         "used. Otherwise, the classic group protocol will be used.";
 
+    public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = 
"errors.dead.letter.queue.topic.name";
+    private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "If 
not null, the default exception handler will build and send a Dead Letter Queue 
record in the provided topic name if an error occurs.\n" +

Review Comment:
   ```suggestion
       private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = 
"If not null, the default exception handler will build and send a Dead Letter 
Queue record to the topic with the provided name if an error occurs.\n" +
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java:
##########
@@ -18,38 +18,42 @@
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.streams.StreamsConfig;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
 /**
  * {@code ProductionExceptionHandler} that always instructs streams to fail 
when an exception
  * happens while attempting to produce result records.
  */
 public class DefaultProductionExceptionHandler implements 
ProductionExceptionHandler {
-    /**
-     * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, 
ProducerRecord, Exception)} instead.
-     */
-    @SuppressWarnings("deprecation")
-    @Deprecated
+
+    private String deadLetterQueueTopic = null;
+
     @Override
-    public ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], byte[]> record,
-                                                     final Exception 
exception) {
+    public Response handleError(final ErrorHandlerContext context,
+                                final ProducerRecord<byte[], byte[]> record,
+                                final Exception exception) {
         return exception instanceof RetriableException ?
-            ProductionExceptionHandlerResponse.RETRY :
-            ProductionExceptionHandlerResponse.FAIL;
+            Response.retry() :
+            
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, 
context.sourceRawKey(), context.sourceRawKey(), context, exception));
     }
 
+    @SuppressWarnings("rawtypes")
     @Override
-    public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext 
context,
-                                                     final 
ProducerRecord<byte[], byte[]> record,
-                                                     final Exception 
exception) {
-        return exception instanceof RetriableException ?
-            ProductionExceptionHandlerResponse.RETRY :
-            ProductionExceptionHandlerResponse.FAIL;
+    public Response handleSerializationError(final ErrorHandlerContext context,
+                                             final ProducerRecord record,

Review Comment:
   for my understanding, why do we need the rawtype here?



##########
streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java:
##########
@@ -95,4 +118,137 @@ enum DeserializationHandlerResponse {
         }
     }
 
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum Result {
+        /** Continue processing. */
+        RESUME(0, "RESUME"),
+        /** Fail processing. */
+        FAIL(1, "FAIL");
+
+        /**
+         * An english description for the used option. This is for debugging 
only and may change.
+         */
+        public final String name;
+
+        /**
+         * The permanent and immutable id for the used option. This can't 
change ever.
+         */
+        public final int id;
+
+        Result(final int id, final String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /**
+         * Converts the deprecated enum DeserializationHandlerResponse into 
the new Result enum.
+         *
+         * @param value the old DeserializationHandlerResponse enum value
+         * @return a {@link Result} enum value
+         * @throws IllegalArgumentException if the provided value does not map 
to a valid {@link Result}
+         */
+        private static DeserializationExceptionHandler.Result from(final 
DeserializationHandlerResponse value) {
+            switch (value) {
+                case FAIL:
+                    return Result.FAIL;
+                case CONTINUE:
+                    return Result.RESUME;
+                default:
+                    throw new IllegalArgumentException("No Result enum found 
for old value: " + value);
+            }
+        }
+    }
+
+    /**
+     * Represents the result of handling a deserialization exception.
+     * <p>
+     * The {@code Response} class encapsulates a {@link Result},
+     * indicating whether processing should continue or fail, along with an 
optional list of
+     * {@link ProducerRecord} instances to be sent to a dead letter queue.
+     * </p>
+     */
+    class Response {
+
+        private Result result;
+
+        private List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords;
+
+        /**
+         * Constructs a new {@code DeserializationExceptionResponse} object.
+         *
+         * @param result the result indicating whether processing should 
continue or fail;
+         *                                  must not be {@code null}.
+         * @param deadLetterQueueRecords    the list of records to be sent to 
the dead letter queue; may be {@code null}.

Review Comment:
   The `@param` list seems misaligned



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -462,15 +483,15 @@ private void recordSendError(final String topic,
             // TransactionAbortedException is only thrown after 
`abortTransaction()` was called,
             // so it's only a followup error, and Kafka Streams is already 
handling the original error
         } else {
-            final ProductionExceptionHandlerResponse response;
+            final ProductionExceptionHandler.Response response;
             try {
                 response = Objects.requireNonNull(
-                    productionExceptionHandler.handle(
+                    productionExceptionHandler.handleError(
                         errorHandlerContext(context, processorNodeId),
                         serializedRecord,
                         productionException
                     ),
-                    "Invalid ProductionExceptionHandler response."
+                    "Invalid ProductionExceptionResponse response."

Review Comment:
   ```suggestion
                       "Invalid ProductionExceptionHandler response."
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -220,11 +221,11 @@ public void process(final Record<KIn, VIn> record) {
                 internalProcessorContext.recordContext().sourceRawValue()
             );
 
-            final ProcessingExceptionHandler.ProcessingHandlerResponse 
response;
+            final ProcessingExceptionHandler.Response 
processingExceptionResponse;
             try {
-                response = Objects.requireNonNull(
-                    processingExceptionHandler.handle(errorHandlerContext, 
record, processingException),
-                    "Invalid ProductionExceptionHandler response."
+                processingExceptionResponse = Objects.requireNonNull(

Review Comment:
   nit: response was a fine variable name



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1762,7 +1764,7 @@ public void 
shouldThrowStreamsExceptionOnSubsequentFlushIfASendFailsAndProductio
         final StreamsException thrown = assertThrows(StreamsException.class, 
collector::flush);
         assertEquals("Fatal user code error in production error callback", 
thrown.getMessage());
         assertInstanceOf(NullPointerException.class, thrown.getCause());
-        assertEquals("Invalid ProductionExceptionHandler response.", 
thrown.getCause().getMessage());
+        assertEquals("Invalid ProductionExceptionResponse response.", 
thrown.getCause().getMessage());

Review Comment:
   ```suggestion
           assertEquals("Invalid ProductionExceptionHandler response.", 
thrown.getCause().getMessage());
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors.internals;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.ErrorHandlerContext;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@code ExceptionHandlerUtils} Contains utilities method that could be used 
by all exception handlers
+ */
+public class ExceptionHandlerUtils {
+    public static final String HEADER_ERRORS_EXCEPTION_NAME = 
"__streams.errors.exception";
+    public static final String HEADER_ERRORS_STACKTRACE_NAME = 
"__streams.errors.stacktrace";
+    public static final String HEADER_ERRORS_EXCEPTION_MESSAGE_NAME = 
"__streams.errors.message";
+    public static final String HEADER_ERRORS_TOPIC_NAME = 
"__streams.errors.topic";
+    public static final String HEADER_ERRORS_PARTITION_NAME = 
"__streams.errors.partition";
+    public static final String HEADER_ERRORS_OFFSET_NAME = 
"__streams.errors.offset";
+
+
+    public static boolean shouldBuildDeadLetterQueueRecord(final String 
deadLetterQueueTopicName) {
+        return deadLetterQueueTopicName != null;
+    }
+
+    /**
+     * If required, return Dead Letter Queue records for the provided exception
+     *
+     * @param key Serialized key for the records
+     * @param value Serialized value for the records
+     * @param context ErrorHandlerContext of the exception
+     * @param exception Thrown exception
+     * @return A list of Dead Letter Queue records to produce
+     */
+    public static List<ProducerRecord<byte[], byte[]>> 
maybeBuildDeadLetterQueueRecords(final String deadLetterQueueTopicName,
+                                                                               
  final byte[] key,
+                                                                               
  final byte[] value,
+                                                                               
  final ErrorHandlerContext context,
+                                                                               
  final Exception exception) {
+        if (!shouldBuildDeadLetterQueueRecord(deadLetterQueueTopicName)) {
+            return Collections.emptyList();
+        }
+
+        return 
Collections.singletonList(buildDeadLetterQueueRecord(deadLetterQueueTopicName, 
key, value, context, exception));
+    }
+
+
+    /**
+     * Build dead letter queue record for the provided exception.
+     *
+     * @param key Serialized key for the record.
+     * @param value Serialized value for the record.
+     * @param context error handler context of the exception.
+     * @return A dead letter queue record to produce.
+     */
+    public static ProducerRecord<byte[], byte[]> 
buildDeadLetterQueueRecord(final String deadLetterQueueTopicName,
+                                                                     final 
byte[] key,
+                                                                     final 
byte[] value,
+                                                                     final 
ErrorHandlerContext context,
+                                                                     final 
Exception e) {
+        if (deadLetterQueueTopicName == null) {
+            throw new InvalidConfigurationException(String.format("%s cannot 
be null while building dead letter queue record", 
StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
+        }
+        final ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(deadLetterQueueTopicName, null, context.timestamp(), key, 
value);
+        final StringWriter stackStraceStringWriter = new StringWriter();
+        final PrintWriter stackTracePrintWriter = new 
PrintWriter(stackStraceStringWriter);
+        e.printStackTrace(stackTracePrintWriter);
+
+        try (final StringSerializer stringSerializer = new StringSerializer()) 
{
+            producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME, 
stringSerializer.serialize(null, e.toString()));
+            producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME, 
stringSerializer.serialize(null, e.getMessage()));
+            producerRecord.headers().add(HEADER_ERRORS_STACKTRACE_NAME, 
stringSerializer.serialize(null, stackStraceStringWriter.toString()));

Review Comment:
   Looks valid @Dabz 



##########
streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext;
+import org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
+import org.apache.kafka.streams.state.StateSerdes;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@ExtendWith(MockitoExtension.class)
+public class ExceptionHandlerUtilsTest {
+    @Test
+    public void checkDealLetterQueueRecords() {

Review Comment:
   Looks valid @Dabz 



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java:
##########
@@ -357,7 +357,7 @@ public void 
shouldStopProcessingWhenProcessingExceptionHandlerReturnsNull() {
             final StreamsException e = assertThrows(StreamsException.class, () 
-> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH));
             assertEquals("Fatal user code error in processing error callback", 
e.getMessage());
             assertInstanceOf(NullPointerException.class, e.getCause());
-            assertEquals("Invalid ProductionExceptionHandler response.", 
e.getCause().getMessage());
+            assertEquals("Invalid ProcessingExceptionResponse response.", 
e.getCause().getMessage());

Review Comment:
   I would drop the second "response" here. Below, it's now just called 
`Response`. so, is this correct?



##########
streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java:
##########
@@ -17,47 +17,27 @@
 package org.apache.kafka.streams.errors;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.StreamsConfig;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
 /**
  * Deserialization handler that logs a deserialization exception and then
  * signals the processing pipeline to continue processing more records.
  */
 public class LogAndContinueExceptionHandler implements 
DeserializationExceptionHandler {
     private static final Logger log = 
LoggerFactory.getLogger(LogAndContinueExceptionHandler.class);
+    private String deadLetterQueueTopic = null;
 
-    /**
-     * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, 
ConsumerRecord, Exception)} instead.
-     */
-    @SuppressWarnings("deprecation")
-    @Deprecated
     @Override
-    public DeserializationHandlerResponse handle(final ProcessorContext 
context,

Review Comment:
   For my understanding - Why can we just remove this?
   
   Don't we have to keep it around for compatibility?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -329,22 +337,22 @@ private <K, V> void handleException(final 
ProductionExceptionHandler.Serializati
                                         final Integer partition,
                                         final Long timestamp,
                                         final String processorNodeId,
-                                        final InternalProcessorContext<Void, 
Void> context,
+                                        final InternalProcessorContext<?, ?> 
context,
                                         final Exception 
serializationException) {
         log.debug(String.format("Error serializing record for topic %s", 
topic), serializationException);
 
         final ProducerRecord<K, V> record = new ProducerRecord<>(topic, 
partition, timestamp, key, value, headers);
 
-        final ProductionExceptionHandlerResponse response;
+        final ProductionExceptionHandler.Response response;
         try {
             response = Objects.requireNonNull(
-                productionExceptionHandler.handleSerializationException(
+                productionExceptionHandler.handleSerializationError(
                     errorHandlerContext(context, processorNodeId),
                     record,
                     serializationException,
                     origin
                 ),
-                "Invalid ProductionExceptionHandler response."
+                "Invalid ProductionExceptionResponse response."

Review Comment:
   ```suggestion
                   "Invalid ProductionExceptionHandler response."
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java:
##########
@@ -1713,7 +1715,7 @@ public void 
shouldThrowStreamsExceptionWhenSerializationFailedAndProductionExcep
 
             assertEquals("Fatal user code error in production error callback", 
exception.getMessage());
             assertInstanceOf(NullPointerException.class, exception.getCause());
-            assertEquals("Invalid ProductionExceptionHandler response.", 
exception.getCause().getMessage());
+            assertEquals("Invalid ProductionExceptionResponse response.", 
exception.getCause().getMessage());

Review Comment:
   ```suggestion
               assertEquals("Invalid ProductionExceptionHandler response.", 
exception.getCause().getMessage());
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -220,11 +221,11 @@ public void process(final Record<KIn, VIn> record) {
                 internalProcessorContext.recordContext().sourceRawValue()
             );
 
-            final ProcessingExceptionHandler.ProcessingHandlerResponse 
response;
+            final ProcessingExceptionHandler.Response 
processingExceptionResponse;
             try {
-                response = Objects.requireNonNull(
-                    processingExceptionHandler.handle(errorHandlerContext, 
record, processingException),
-                    "Invalid ProductionExceptionHandler response."
+                processingExceptionResponse = Objects.requireNonNull(
+                    
processingExceptionHandler.handleError(errorHandlerContext, record, 
processingException),
+                    "Invalid ProcessingExceptionResponse response."

Review Comment:
   It's now called `Response`. 
   
   ```suggestion
                       "Invalid ProcessingExceptionHandler response."
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java:
##########
@@ -18,38 +18,42 @@
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.streams.StreamsConfig;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
 /**
  * {@code ProductionExceptionHandler} that always instructs streams to fail 
when an exception
  * happens while attempting to produce result records.
  */
 public class DefaultProductionExceptionHandler implements 
ProductionExceptionHandler {
-    /**
-     * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, 
ProducerRecord, Exception)} instead.
-     */
-    @SuppressWarnings("deprecation")
-    @Deprecated
+
+    private String deadLetterQueueTopic = null;
+
     @Override
-    public ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], byte[]> record,
-                                                     final Exception 
exception) {
+    public Response handleError(final ErrorHandlerContext context,
+                                final ProducerRecord<byte[], byte[]> record,
+                                final Exception exception) {
         return exception instanceof RetriableException ?
-            ProductionExceptionHandlerResponse.RETRY :
-            ProductionExceptionHandlerResponse.FAIL;
+            Response.retry() :
+            
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, 
context.sourceRawKey(), context.sourceRawKey(), context, exception));

Review Comment:
   please check this @Dabz 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to