sebastienviale commented on code in PR #17942:
URL: https://github.com/apache/kafka/pull/17942#discussion_r1920324025


##########
streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java:
##########
@@ -95,4 +118,137 @@ enum DeserializationHandlerResponse {
         }
     }
 
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum Result {
+        /** Continue processing. */
+        RESUME(0, "RESUME"),
+        /** Fail processing. */
+        FAIL(1, "FAIL");
+
+        /**
+         * An english description for the used option. This is for debugging 
only and may change.
+         */
+        public final String name;
+
+        /**
+         * The permanent and immutable id for the used option. This can't 
change ever.
+         */
+        public final int id;
+
+        Result(final int id, final String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /**
+         * Converts the deprecated enum DeserializationHandlerResponse into 
the new Result enum.
+         *
+         * @param value the old DeserializationHandlerResponse enum value
+         * @return a {@link Result} enum value
+         * @throws IllegalArgumentException if the provided value does not map 
to a valid {@link Result}
+         */
+        private static DeserializationExceptionHandler.Result from(final 
DeserializationHandlerResponse value) {
+            switch (value) {
+                case FAIL:
+                    return Result.FAIL;
+                case CONTINUE:
+                    return Result.RESUME;
+                default:
+                    throw new IllegalArgumentException("No Result enum found 
for old value: " + value);
+            }
+        }
+    }
+
+    /**
+     * Represents the result of handling a deserialization exception.
+     * <p>
+     * The {@code Response} class encapsulates a {@link 
ProcessingExceptionHandler.Result},
+     * indicating whether processing should continue or fail, along with an 
optional list of
+     * {@link ProducerRecord} instances to be sent to a dead letter queue.
+     * </p>
+     */
+    class Response {
+
+        private Result result;
+
+        private List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords;
+
+        /**
+         * Constructs a new {@code DeserializationExceptionResponse} object.
+         *
+         * @param result the result indicating whether processing should 
continue or fail;
+         *                                  must not be {@code null}.
+         * @param deadLetterQueueRecords    the list of records to be sent to 
the dead letter queue; may be {@code null}.
+         */
+        private Response(final Result result,
+                         final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords) {
+            this.result = result;
+            this.deadLetterQueueRecords = deadLetterQueueRecords;
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should fail.
+         *
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         * @return a {@code Response} with a {@link 
DeserializationExceptionHandler.Result#FAIL} status.
+         */
+        public static Response fail(final List<ProducerRecord<byte[], byte[]>> 
deadLetterQueueRecords) {
+            return new Response(Result.FAIL, deadLetterQueueRecords);
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should fail.
+         *
+         * @return a {@code Response} with a {@link 
DeserializationExceptionHandler.Result#FAIL} status.
+         */
+        public static Response fail() {
+            return fail(Collections.emptyList());
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should 
continue.
+         *
+         * @param deadLetterQueueRecords the list of records to be sent to the 
dead letter queue; may be {@code null}.
+         * @return a {@code Response} with a {@link 
DeserializationExceptionHandler.Result#RESUME} status.
+         */
+        public static Response resume(final List<ProducerRecord<byte[], 
byte[]>> deadLetterQueueRecords) {
+            return new Response(Result.RESUME, deadLetterQueueRecords);
+        }
+
+        /**
+         * Creates a {@code Response} indicating that processing should 
continue.
+         *
+         * @return a {@code Response} with a {@link 
DeserializationHandlerResponse#CONTINUE} status.

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to