cadonna commented on code in PR #17942:
URL: https://github.com/apache/kafka/pull/17942#discussion_r1918631948


##########
streams/src/main/java/org/apache/kafka/streams/errors/ExceptionHandlerUtils.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@code CommonExceptionHandler} Contains utilities method that could be used 
by all exception handlers
+ */
+class ExceptionHandlerUtils {
+    static final String HEADER_ERRORS_EXCEPTION_NAME = 
"__streams.errors.exception";
+    static final String HEADER_ERRORS_STACKTRACE_NAME = 
"__streams.errors.stacktrace";
+    static final String HEADER_ERRORS_EXCEPTION_MESSAGE_NAME = 
"__streams.errors.message";
+    static final String HEADER_ERRORS_TOPIC_NAME = "__streams.errors.topic";
+    static final String HEADER_ERRORS_PARTITION_NAME = 
"__streams.errors.partition";
+    static final String HEADER_ERRORS_OFFSET_NAME = "__streams.errors.offset";
+
+
+    static boolean shouldBuildDeadLetterQueueRecord(final String 
deadLetterQueueTopicName) {
+        return deadLetterQueueTopicName != null;
+    }
+
+    /**
+     * If required, return Dead Letter Queue records for the provided exception
+     * @param key Serialized key for the records
+     * @param value Serialized value for the records
+     * @param context ErrorHandlerContext of the exception
+     * @param exception Thrown exception
+     * @return A list of Dead Letter Queue records to produce
+     */
+    static List<ProducerRecord<byte[], byte[]>> 
maybeBuildDeadLetterQueueRecords(final String deadLetterQueueTopicName,
+                                                                               
  final byte[] key,
+                                                                               
  final byte[] value,
+                                                                               
  final ErrorHandlerContext context,
+                                                                               
  final Exception exception) {
+        if (!shouldBuildDeadLetterQueueRecord(deadLetterQueueTopicName)) {
+            return Collections.emptyList();
+        }
+
+        return 
Collections.singletonList(buildDeadLetterQueueRecord(deadLetterQueueTopicName, 
key, value, context, exception));
+    }
+
+
+    /**
+     * Build Dead Letter Queue records for the provided exception
+     * @param key Serialized key for the records
+     * @param value Serialized value for the records
+     * @param context ErrorHandlerContext of the exception
+     * @return A list of Dead Letter Queue records to produce
+     */
+    static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final 
String deadLetterQueueTopicName,
+                                                                            
final byte[] key,
+                                                                            
final byte[] value,
+                                                                            
final ErrorHandlerContext context,
+                                                                            
final Exception e) {
+        if (deadLetterQueueTopicName == null) {
+            throw new InvalidConfigurationException(String.format("%s can not 
be null while building DeadLetterQueue record", 
StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));

Review Comment:
   ```suggestion
               throw new InvalidConfigurationException(String.format("%s cannot 
be null while building dead letter queue record", 
StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG));
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java:
##########
@@ -55,11 +58,32 @@ default ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], b
      *     The exception that occurred during production.
      *
      * @return Whether to continue or stop processing, or retry the failed 
operation.
+     * @deprecated Use {@link #handleError(ErrorHandlerContext, 
ProducerRecord, Exception)} instead.
      */
+    @Deprecated
     default ProductionExceptionHandlerResponse handle(final 
ErrorHandlerContext context,
                                                       final 
ProducerRecord<byte[], byte[]> record,
                                                       final Exception 
exception) {
-        return handle(record, exception);
+        throw new UnsupportedOperationException();

Review Comment:
   Shouldn't this still call the other deprecated method?
   Imagine a user implemented 
   ```java
   handle(final ProducerRecord<byte[], byte[]> record, final Exception 
exception)
   ```
    but not 
   ```java
   ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, 
                                             final ProducerRecord<byte[], 
byte[]> record,
                                             final Exception exception)
   ```
   Streams would throw an `UnsupportedOperationException` although it did not 
before upgrading to this version.



##########
streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java:
##########
@@ -147,10 +198,174 @@ enum ProductionExceptionHandlerResponse {
         }
     }
 
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum Result {
+        /** Resume processing.
+         *
+         * <p> For this case, output records which could not be written 
successfully are lost.
+         * Use this option only if you can tolerate data loss.
+         */
+        RESUME(0, "RESUME"),
+        /** Fail processing.
+         *
+         * <p> Kafka Streams will raise an exception and the {@code 
StreamsThread} will fail.
+         * No offsets (for {@link 
org.apache.kafka.streams.StreamsConfig#AT_LEAST_ONCE at-least-once}) or 
transactions
+         * (for {@link org.apache.kafka.streams.StreamsConfig#EXACTLY_ONCE_V2 
exactly-once}) will be committed.
+         */
+        FAIL(1, "FAIL"),
+        /** Retry the failed operation.
+         *
+         * <p> Retrying might imply that a {@link TaskCorruptedException} 
exception is thrown, and that the retry
+         * is started from the last committed offset.
+         *
+         * <p> <b>NOTE:</b> {@code RETRY} is only a valid return value for
+         * {@link org.apache.kafka.common.errors.RetriableException retriable 
exceptions}.
+         * If {@code RETRY} is returned for a non-retriable exception it will 
be interpreted as {@link #FAIL}.
+         */
+        RETRY(2, "RETRY");
+
+        /**
+         * An english description for the used option. This is for debugging 
only and may change.
+         */
+        public final String name;
+
+        /**
+         * The permanent and immutable id for the used option. This can't 
change ever.
+         */
+        public final int id;
+
+        Result(final int id, final String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /**
+         * Converts the deprecated enum ProductionExceptionHandlerResponse 
into the new Result enum.
+         *
+         * @param value the old ProductionExceptionHandlerResponse enum value
+         * @return a {@link ProductionExceptionHandler.Result} enum value
+         * @throws IllegalArgumentException if the provided value does not map 
to a valid {@link ProductionExceptionHandler.Result}
+         */
+        private static ProductionExceptionHandler.Result from(final 
ProductionExceptionHandlerResponse value) {
+            switch (value) {
+                case FAIL:
+                    return Result.FAIL;
+                case CONTINUE:
+                    return Result.RESUME;
+                case RETRY:
+                    return Result.RETRY;
+                default:
+                    throw new IllegalArgumentException("No Result enum found 
for old value: " + value);
+            }
+        }
+    }
+
     enum SerializationExceptionOrigin {
         /** Serialization exception occurred during serialization of the key. 
*/
         KEY,
         /** Serialization exception occurred during serialization of the 
value. */
         VALUE
     }
+
+    /**
+     * Represents the result of handling a production exception.
+     * <p>
+     * The {@code Response} class encapsulates a {@link 
ProductionExceptionHandlerResponse},

Review Comment:
   ```suggestion
        * The {@code Response} class encapsulates a {@link Result},
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java:
##########
@@ -147,10 +198,174 @@ enum ProductionExceptionHandlerResponse {
         }
     }
 
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum Result {
+        /** Resume processing.
+         *
+         * <p> For this case, output records which could not be written 
successfully are lost.
+         * Use this option only if you can tolerate data loss.
+         */
+        RESUME(0, "RESUME"),
+        /** Fail processing.
+         *
+         * <p> Kafka Streams will raise an exception and the {@code 
StreamsThread} will fail.
+         * No offsets (for {@link 
org.apache.kafka.streams.StreamsConfig#AT_LEAST_ONCE at-least-once}) or 
transactions
+         * (for {@link org.apache.kafka.streams.StreamsConfig#EXACTLY_ONCE_V2 
exactly-once}) will be committed.
+         */
+        FAIL(1, "FAIL"),
+        /** Retry the failed operation.
+         *
+         * <p> Retrying might imply that a {@link TaskCorruptedException} 
exception is thrown, and that the retry
+         * is started from the last committed offset.
+         *
+         * <p> <b>NOTE:</b> {@code RETRY} is only a valid return value for
+         * {@link org.apache.kafka.common.errors.RetriableException retriable 
exceptions}.
+         * If {@code RETRY} is returned for a non-retriable exception it will 
be interpreted as {@link #FAIL}.
+         */
+        RETRY(2, "RETRY");
+
+        /**
+         * An english description for the used option. This is for debugging 
only and may change.
+         */
+        public final String name;
+
+        /**
+         * The permanent and immutable id for the used option. This can't 
change ever.
+         */
+        public final int id;
+
+        Result(final int id, final String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /**
+         * Converts the deprecated enum ProductionExceptionHandlerResponse 
into the new Result enum.
+         *
+         * @param value the old ProductionExceptionHandlerResponse enum value
+         * @return a {@link ProductionExceptionHandler.Result} enum value
+         * @throws IllegalArgumentException if the provided value does not map 
to a valid {@link ProductionExceptionHandler.Result}
+         */
+        private static ProductionExceptionHandler.Result from(final 
ProductionExceptionHandlerResponse value) {
+            switch (value) {
+                case FAIL:
+                    return Result.FAIL;
+                case CONTINUE:
+                    return Result.RESUME;
+                case RETRY:
+                    return Result.RETRY;
+                default:
+                    throw new IllegalArgumentException("No Result enum found 
for old value: " + value);
+            }
+        }
+    }
+
     enum SerializationExceptionOrigin {
         /** Serialization exception occurred during serialization of the key. 
*/
         KEY,
         /** Serialization exception occurred during serialization of the 
value. */
         VALUE
     }
+
+    /**
+     * Represents the result of handling a production exception.
+     * <p>
+     * The {@code Response} class encapsulates a {@link 
ProductionExceptionHandlerResponse},
+     * indicating whether processing should continue or fail, along with an 
optional list of
+     * {@link ProducerRecord} instances to be sent to a dead letter queue.
+     * </p>
+     */
+    class Response {

Review Comment:
   Could you please add some unit tests for this class?



##########
streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java:
##########
@@ -45,8 +51,26 @@ public ProcessingHandlerResponse handle(final 
ErrorHandlerContext context, final
         return ProcessingHandlerResponse.FAIL;
     }
 
+    @Override
+    public Response handleError(final ErrorHandlerContext context,
+                                final Record<?, ?> record,
+                                final Exception exception) {
+        log.warn(

Review Comment:
   This should be `log.error()` I believe.



##########
streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java:
##########
@@ -18,35 +18,72 @@
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.streams.StreamsConfig;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.errors.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
 /**
  * {@code ProductionExceptionHandler} that always instructs streams to fail 
when an exception
  * happens while attempting to produce result records.
  */
 public class DefaultProductionExceptionHandler implements 
ProductionExceptionHandler {
+    private String deadLetterQueueTopic = null;
+
     @SuppressWarnings("deprecation")
     @Deprecated
     @Override
     public ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], byte[]> record,
                                                      final Exception 
exception) {
         return exception instanceof RetriableException ?
-            ProductionExceptionHandlerResponse.RETRY :
-            ProductionExceptionHandlerResponse.FAIL;
+            
ProductionExceptionHandler.ProductionExceptionHandlerResponse.RETRY :
+            ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL;
     }
 
+    @SuppressWarnings("deprecation")
+    @Deprecated
     @Override
     public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext 
context,
                                                      final 
ProducerRecord<byte[], byte[]> record,
                                                      final Exception 
exception) {
         return exception instanceof RetriableException ?
-            ProductionExceptionHandlerResponse.RETRY :
-            ProductionExceptionHandlerResponse.FAIL;
+                
ProductionExceptionHandler.ProductionExceptionHandlerResponse.RETRY :
+                
ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL;
+    }

Review Comment:
   Those handler methods are not called anymore. You can remove them.



##########
streams/src/main/java/org/apache/kafka/streams/errors/ExceptionHandlerUtils.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@code CommonExceptionHandler} Contains utilities method that could be used 
by all exception handlers
+ */
+class ExceptionHandlerUtils {

Review Comment:
   If this can be used by all exception handlers, why is it then package 
private and why is it not in the KIP?
   Since this is not in an internal package it will show up in the javadocs 
which makes it a public API.
   Either you make it public and add it to the KIP or you move it to an 
internal package. The latter means that it can be used by the internal handlers 
but not by external ones (actually it can be used but with guarantees for 
backward compatibility).



##########
streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java:
##########
@@ -95,4 +118,137 @@ enum DeserializationHandlerResponse {
         }
     }
 
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum Result {
+        /** Continue processing. */
+        RESUME(0, "RESUME"),
+        /** Fail processing. */
+        FAIL(1, "FAIL");
+
+        /**
+         * An english description for the used option. This is for debugging 
only and may change.
+         */
+        public final String name;
+
+        /**
+         * The permanent and immutable id for the used option. This can't 
change ever.
+         */
+        public final int id;
+
+        Result(final int id, final String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /**
+         * Converts the deprecated enum DeserializationHandlerResponse into 
the new Result enum.
+         *
+         * @param value the old DeserializationHandlerResponse enum value
+         * @return a {@link Result} enum value
+         * @throws IllegalArgumentException if the provided value does not map 
to a valid {@link Result}
+         */
+        private static DeserializationExceptionHandler.Result from(final 
DeserializationHandlerResponse value) {
+            switch (value) {
+                case FAIL:
+                    return Result.FAIL;
+                case CONTINUE:
+                    return Result.RESUME;
+                default:
+                    throw new IllegalArgumentException("No Result enum found 
for old value: " + value);
+            }
+        }
+    }
+
+    /**
+     * Represents the result of handling a deserialization exception.
+     * <p>
+     * The {@code Response} class encapsulates a {@link 
ProcessingExceptionHandler.Result},
+     * indicating whether processing should continue or fail, along with an 
optional list of
+     * {@link ProducerRecord} instances to be sent to a dead letter queue.
+     * </p>
+     */
+    class Response {
+
+        private Result result;
+
+        private List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords;
+
+        /**
+         * Constructs a new {@code DeserializationExceptionResponse} object.
+         *
+         * @param result the result indicating whether processing should 
continue or fail;
+         *                                  must not be {@code null}.
+         * @param deadLetterQueueRecords    the list of records to be sent to 
the dead letter queue; may be {@code null}.
+         */
+        private Response(final Result result,
+                         final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords) {
+            this.result = result;
+            this.deadLetterQueueRecords = deadLetterQueueRecords;
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should fail.
+         *
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         * @return a {@code Response} with a {@link 
DeserializationExceptionHandler.Result#FAIL} status.
+         */
+        public static Response fail(final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords) {
+            return new Response(Result.FAIL, deadLetterQueueRecords);
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should fail.
+         *
+         * @return a {@code Response} with a {@link 
DeserializationExceptionHandler.Result#FAIL} status.
+         */
+        public static Response fail() {
+            return fail(Collections.emptyList());
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should 
continue.
+         *
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         * @return a {@code Response} with a {@link 
DeserializationExceptionHandler.Result#RESUME} status.
+         */
+        public static Response resume(final List<ProducerRecord<byte[], 
byte[]>> deadLetterQueueRecords) {
+            return new Response(Result.RESUME, deadLetterQueueRecords);
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should 
continue.
+         *
+         * @return a {@code Response} with a {@link 
DeserializationHandlerResponse#CONTINUE} status.

Review Comment:
   ```suggestion
            * @return a {@code Response} with a {@link 
DeserializationExceptionHandler.Result#RESUME} status.
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/ExceptionHandlerUtils.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@code CommonExceptionHandler} Contains utilities method that could be used 
by all exception handlers
+ */
+class ExceptionHandlerUtils {
+    static final String HEADER_ERRORS_EXCEPTION_NAME = 
"__streams.errors.exception";
+    static final String HEADER_ERRORS_STACKTRACE_NAME = 
"__streams.errors.stacktrace";
+    static final String HEADER_ERRORS_EXCEPTION_MESSAGE_NAME = 
"__streams.errors.message";
+    static final String HEADER_ERRORS_TOPIC_NAME = "__streams.errors.topic";
+    static final String HEADER_ERRORS_PARTITION_NAME = 
"__streams.errors.partition";
+    static final String HEADER_ERRORS_OFFSET_NAME = "__streams.errors.offset";
+
+
+    static boolean shouldBuildDeadLetterQueueRecord(final String 
deadLetterQueueTopicName) {
+        return deadLetterQueueTopicName != null;
+    }
+
+    /**
+     * If required, return Dead Letter Queue records for the provided exception
+     * @param key Serialized key for the records
+     * @param value Serialized value for the records
+     * @param context ErrorHandlerContext of the exception
+     * @param exception Thrown exception
+     * @return A list of Dead Letter Queue records to produce
+     */
+    static List<ProducerRecord<byte[], byte[]>> 
maybeBuildDeadLetterQueueRecords(final String deadLetterQueueTopicName,
+                                                                               
  final byte[] key,
+                                                                               
  final byte[] value,
+                                                                               
  final ErrorHandlerContext context,
+                                                                               
  final Exception exception) {
+        if (!shouldBuildDeadLetterQueueRecord(deadLetterQueueTopicName)) {
+            return Collections.emptyList();
+        }
+
+        return 
Collections.singletonList(buildDeadLetterQueueRecord(deadLetterQueueTopicName, 
key, value, context, exception));
+    }
+
+
+    /**
+     * Build Dead Letter Queue records for the provided exception
+     * @param key Serialized key for the records
+     * @param value Serialized value for the records
+     * @param context ErrorHandlerContext of the exception
+     * @return A list of Dead Letter Queue records to produce
+     */
+    static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final 
String deadLetterQueueTopicName,
+                                                                            
final byte[] key,
+                                                                            
final byte[] value,
+                                                                            
final ErrorHandlerContext context,
+                                                                            
final Exception e) {

Review Comment:
   ```suggestion
       static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final 
String deadLetterQueueTopicName,
                                                                        final 
byte[] key,
                                                                        final 
byte[] value,
                                                                        final 
ErrorHandlerContext context,
                                                                        final 
Exception e) {
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java:
##########
@@ -18,35 +18,72 @@
 
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.streams.StreamsConfig;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.errors.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
 /**
  * {@code ProductionExceptionHandler} that always instructs streams to fail 
when an exception
  * happens while attempting to produce result records.
  */
 public class DefaultProductionExceptionHandler implements 
ProductionExceptionHandler {
+    private String deadLetterQueueTopic = null;
+
     @SuppressWarnings("deprecation")
     @Deprecated
     @Override
     public ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], byte[]> record,
                                                      final Exception 
exception) {
         return exception instanceof RetriableException ?
-            ProductionExceptionHandlerResponse.RETRY :
-            ProductionExceptionHandlerResponse.FAIL;
+            
ProductionExceptionHandler.ProductionExceptionHandlerResponse.RETRY :
+            ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL;
     }
 
+    @SuppressWarnings("deprecation")
+    @Deprecated
     @Override
     public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext 
context,
                                                      final 
ProducerRecord<byte[], byte[]> record,
                                                      final Exception 
exception) {
         return exception instanceof RetriableException ?
-            ProductionExceptionHandlerResponse.RETRY :
-            ProductionExceptionHandlerResponse.FAIL;
+                
ProductionExceptionHandler.ProductionExceptionHandlerResponse.RETRY :
+                
ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL;
+    }
+
+    @Override
+    public Response handleError(final ErrorHandlerContext context,
+                                final ProducerRecord<byte[], byte[]> record,
+                                final Exception exception) {
+        return exception instanceof RetriableException ?
+            Response.retry() :
+            
Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, null, 
null, context, exception));
+    }
+
+
+    @SuppressWarnings("deprecation")
+    @Deprecated
+    @Override
+    public ProductionExceptionHandlerResponse 
handleSerializationException(final ErrorHandlerContext context,
+                                                                           
final ProducerRecord record,
+                                                                           
final Exception exception,
+                                                                           
final SerializationExceptionOrigin origin) {
+        return 
ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL;
     }

Review Comment:
   Why do you need this? Isn't this equivalent to the default implementation of 
the interface?
   Additionally, this handler method is not called in Kafka Streams anymore. 
You can remove it. 



##########
streams/src/main/java/org/apache/kafka/streams/errors/ExceptionHandlerUtils.java:
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.errors.InvalidConfigurationException;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * {@code CommonExceptionHandler} Contains utilities method that could be used 
by all exception handlers
+ */
+class ExceptionHandlerUtils {
+    static final String HEADER_ERRORS_EXCEPTION_NAME = 
"__streams.errors.exception";
+    static final String HEADER_ERRORS_STACKTRACE_NAME = 
"__streams.errors.stacktrace";
+    static final String HEADER_ERRORS_EXCEPTION_MESSAGE_NAME = 
"__streams.errors.message";
+    static final String HEADER_ERRORS_TOPIC_NAME = "__streams.errors.topic";
+    static final String HEADER_ERRORS_PARTITION_NAME = 
"__streams.errors.partition";
+    static final String HEADER_ERRORS_OFFSET_NAME = "__streams.errors.offset";
+
+
+    static boolean shouldBuildDeadLetterQueueRecord(final String 
deadLetterQueueTopicName) {
+        return deadLetterQueueTopicName != null;
+    }
+
+    /**
+     * If required, return Dead Letter Queue records for the provided exception
+     * @param key Serialized key for the records
+     * @param value Serialized value for the records
+     * @param context ErrorHandlerContext of the exception
+     * @param exception Thrown exception
+     * @return A list of Dead Letter Queue records to produce
+     */
+    static List<ProducerRecord<byte[], byte[]>> 
maybeBuildDeadLetterQueueRecords(final String deadLetterQueueTopicName,
+                                                                               
  final byte[] key,
+                                                                               
  final byte[] value,
+                                                                               
  final ErrorHandlerContext context,
+                                                                               
  final Exception exception) {
+        if (!shouldBuildDeadLetterQueueRecord(deadLetterQueueTopicName)) {
+            return Collections.emptyList();
+        }
+
+        return 
Collections.singletonList(buildDeadLetterQueueRecord(deadLetterQueueTopicName, 
key, value, context, exception));
+    }
+
+
+    /**
+     * Build Dead Letter Queue records for the provided exception
+     * @param key Serialized key for the records
+     * @param value Serialized value for the records
+     * @param context ErrorHandlerContext of the exception
+     * @return A list of Dead Letter Queue records to produce
+     */

Review Comment:
   ```suggestion
       /**
        * Build dead letter queue record for the provided exception.
        *
        * @param key       Serialized key for the record.
        * @param value    Serialized value for the record.
        * @param context error handler context of the exception.
        * @return A dead letter queue record to produce.
        */
   ```



##########
streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java:
##########
@@ -50,6 +54,8 @@ public DeserializationHandlerResponse handle(final 
ProcessorContext context,
         return DeserializationHandlerResponse.CONTINUE;
     }
 
+    @SuppressWarnings("deprecation")
+    @Deprecated
     @Override
     public DeserializationHandlerResponse handle(final ErrorHandlerContext 
context,

Review Comment:
   You can remove those deprecated handler methods. They are not called 
anywhere in the Streams code.



##########
streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java:
##########
@@ -50,6 +54,8 @@ public DeserializationHandlerResponse handle(final 
ProcessorContext context,
         return DeserializationHandlerResponse.FAIL;
     }
 
+    @SuppressWarnings("deprecation")
+    @Deprecated
     @Override
     public DeserializationHandlerResponse handle(final ErrorHandlerContext 
context,

Review Comment:
   You can remove those deprecated handler methods. They are not called 
anywhere in the Streams code.



##########
streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java:
##########
@@ -67,8 +73,25 @@ public DeserializationHandlerResponse handle(final 
ErrorHandlerContext context,
         return DeserializationHandlerResponse.FAIL;
     }
 
+    @Override
+    public Response handleError(final ErrorHandlerContext context,
+                                final ConsumerRecord<byte[], byte[]> record,
+                                final Exception exception) {
+        log.warn(

Review Comment:
   This should be `log.error()`. It is failing, not resuming.



##########
streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java:
##########
@@ -16,22 +16,28 @@
  */
 package org.apache.kafka.streams.errors;
 
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.api.Record;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.errors.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
 /**
  * Processing exception handler that logs a processing exception and then
  * signals the processing pipeline to stop processing more records and fail.
  */
 public class LogAndFailProcessingExceptionHandler implements 
ProcessingExceptionHandler {
     private static final Logger log = 
LoggerFactory.getLogger(LogAndFailProcessingExceptionHandler.class);
+    private String deadLetterQueueTopic = null;
 
+    @Deprecated
     @Override
-    public ProcessingHandlerResponse handle(final ErrorHandlerContext context, 
final Record<?, ?> record, final Exception exception) {
+    public ProcessingHandlerResponse handle(final ErrorHandlerContext context,
+                                            final Record<?, ?> record, final 
Exception exception) {

Review Comment:
   You can remove this deprecated handler method. It is not called anywhere in 
the Streams code.



##########
streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java:
##########
@@ -55,11 +58,32 @@ default ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], b
      *     The exception that occurred during production.
      *
      * @return Whether to continue or stop processing, or retry the failed 
operation.
+     * @deprecated Use {@link #handleError(ErrorHandlerContext, 
ProducerRecord, Exception)} instead.
      */
+    @Deprecated
     default ProductionExceptionHandlerResponse handle(final 
ErrorHandlerContext context,
                                                       final 
ProducerRecord<byte[], byte[]> record,
                                                       final Exception 
exception) {
-        return handle(record, exception);
+        throw new UnsupportedOperationException();

Review Comment:
   Maybe it would be beneficial to add some unit tests that verify this 
redirection. With such unit tests, this removal would had happened without some 
thoughts about why the test failed.



##########
streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java:
##########
@@ -16,22 +16,29 @@
  */
 package org.apache.kafka.streams.errors;
 
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.api.Record;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+import static 
org.apache.kafka.streams.errors.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords;
+
 /**
  * Processing exception handler that logs a processing exception and then
  * signals the processing pipeline to continue processing more records.
  */
 public class LogAndContinueProcessingExceptionHandler implements 
ProcessingExceptionHandler {
     private static final Logger log = 
LoggerFactory.getLogger(LogAndContinueProcessingExceptionHandler.class);
+    private String deadLetterQueueTopic = null;
 
+    @Deprecated
     @Override
-    public ProcessingHandlerResponse handle(final ErrorHandlerContext context, 
final Record<?, ?> record, final Exception exception) {
+    public ProcessingHandlerResponse handle(final ErrorHandlerContext context,

Review Comment:
   You can remove this deprecated handler method. It is not called anywhere in 
the Streams code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to