sebastienviale commented on code in PR #17942:
URL: https://github.com/apache/kafka/pull/17942#discussion_r1920319560


##########
streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java:
##########
@@ -55,11 +58,32 @@ default ProductionExceptionHandlerResponse handle(final 
ProducerRecord<byte[], b
      *     The exception that occurred during production.
      *
      * @return Whether to continue or stop processing, or retry the failed 
operation.
+     * @deprecated Use {@link #handleError(ErrorHandlerContext, 
ProducerRecord, Exception)} instead.
      */
+    @Deprecated
     default ProductionExceptionHandlerResponse handle(final 
ErrorHandlerContext context,
                                                       final 
ProducerRecord<byte[], byte[]> record,
                                                       final Exception 
exception) {
-        return handle(record, exception);
+        throw new UnsupportedOperationException();

Review Comment:
   indeed, I added a Unit Test that failed if an 
`UnsupportedOperationException` is thrown



##########
streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java:
##########
@@ -147,10 +198,174 @@ enum ProductionExceptionHandlerResponse {
         }
     }
 
+    /**
+     * Enumeration that describes the response from the exception handler.
+     */
+    enum Result {
+        /** Resume processing.
+         *
+         * <p> For this case, output records which could not be written 
successfully are lost.
+         * Use this option only if you can tolerate data loss.
+         */
+        RESUME(0, "RESUME"),
+        /** Fail processing.
+         *
+         * <p> Kafka Streams will raise an exception and the {@code 
StreamsThread} will fail.
+         * No offsets (for {@link 
org.apache.kafka.streams.StreamsConfig#AT_LEAST_ONCE at-least-once}) or 
transactions
+         * (for {@link org.apache.kafka.streams.StreamsConfig#EXACTLY_ONCE_V2 
exactly-once}) will be committed.
+         */
+        FAIL(1, "FAIL"),
+        /** Retry the failed operation.
+         *
+         * <p> Retrying might imply that a {@link TaskCorruptedException} 
exception is thrown, and that the retry
+         * is started from the last committed offset.
+         *
+         * <p> <b>NOTE:</b> {@code RETRY} is only a valid return value for
+         * {@link org.apache.kafka.common.errors.RetriableException retriable 
exceptions}.
+         * If {@code RETRY} is returned for a non-retriable exception it will 
be interpreted as {@link #FAIL}.
+         */
+        RETRY(2, "RETRY");
+
+        /**
+         * An english description for the used option. This is for debugging 
only and may change.
+         */
+        public final String name;
+
+        /**
+         * The permanent and immutable id for the used option. This can't 
change ever.
+         */
+        public final int id;
+
+        Result(final int id, final String name) {
+            this.id = id;
+            this.name = name;
+        }
+
+        /**
+         * Converts the deprecated enum ProductionExceptionHandlerResponse 
into the new Result enum.
+         *
+         * @param value the old ProductionExceptionHandlerResponse enum value
+         * @return a {@link ProductionExceptionHandler.Result} enum value
+         * @throws IllegalArgumentException if the provided value does not map 
to a valid {@link ProductionExceptionHandler.Result}
+         */
+        private static ProductionExceptionHandler.Result from(final 
ProductionExceptionHandlerResponse value) {
+            switch (value) {
+                case FAIL:
+                    return Result.FAIL;
+                case CONTINUE:
+                    return Result.RESUME;
+                case RETRY:
+                    return Result.RETRY;
+                default:
+                    throw new IllegalArgumentException("No Result enum found 
for old value: " + value);
+            }
+        }
+    }
+
     enum SerializationExceptionOrigin {
         /** Serialization exception occurred during serialization of the key. 
*/
         KEY,
         /** Serialization exception occurred during serialization of the 
value. */
         VALUE
     }
+
+    /**
+     * Represents the result of handling a production exception.
+     * <p>
+     * The {@code Response} class encapsulates a {@link 
ProductionExceptionHandlerResponse},

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to