cadonna commented on code in PR #17942: URL: https://github.com/apache/kafka/pull/17942#discussion_r1918631948
########## streams/src/main/java/org/apache/kafka/streams/errors/ExceptionHandlerUtils.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.errors; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.List; + +/** + * {@code CommonExceptionHandler} Contains utilities method that could be used by all exception handlers + */ +class ExceptionHandlerUtils { + static final String HEADER_ERRORS_EXCEPTION_NAME = "__streams.errors.exception"; + static final String HEADER_ERRORS_STACKTRACE_NAME = "__streams.errors.stacktrace"; + static final String HEADER_ERRORS_EXCEPTION_MESSAGE_NAME = "__streams.errors.message"; + static final String HEADER_ERRORS_TOPIC_NAME = "__streams.errors.topic"; + static final String HEADER_ERRORS_PARTITION_NAME = "__streams.errors.partition"; + static final String HEADER_ERRORS_OFFSET_NAME = "__streams.errors.offset"; + + + static boolean shouldBuildDeadLetterQueueRecord(final String deadLetterQueueTopicName) { + return deadLetterQueueTopicName != null; + } + + /** + * If required, return Dead Letter Queue records for the provided exception + * @param key Serialized key for the records + * @param value Serialized value for the records + * @param context ErrorHandlerContext of the exception + * @param exception Thrown exception + * @return A list of Dead Letter Queue records to produce + */ + static List<ProducerRecord<byte[], byte[]>> maybeBuildDeadLetterQueueRecords(final String deadLetterQueueTopicName, + final byte[] key, + final byte[] value, + final ErrorHandlerContext context, + final Exception exception) { + if (!shouldBuildDeadLetterQueueRecord(deadLetterQueueTopicName)) { + return Collections.emptyList(); + } + + return Collections.singletonList(buildDeadLetterQueueRecord(deadLetterQueueTopicName, key, value, context, exception)); + } + + + /** + * Build Dead Letter Queue records for the provided exception + * @param key Serialized key for the records + * @param value Serialized value for the records + * @param context ErrorHandlerContext of the exception + * @return A list of Dead Letter Queue records to produce + */ + static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final String deadLetterQueueTopicName, + final byte[] key, + final byte[] value, + final ErrorHandlerContext context, + final Exception e) { + if (deadLetterQueueTopicName == null) { + throw new InvalidConfigurationException(String.format("%s can not be null while building DeadLetterQueue record", StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG)); Review Comment: ```suggestion throw new InvalidConfigurationException(String.format("%s cannot be null while building dead letter queue record", StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG)); ``` ########## streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java: ########## @@ -55,11 +58,32 @@ default ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], b * The exception that occurred during production. * * @return Whether to continue or stop processing, or retry the failed operation. + * @deprecated Use {@link #handleError(ErrorHandlerContext, ProducerRecord, Exception)} instead. */ + @Deprecated default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord<byte[], byte[]> record, final Exception exception) { - return handle(record, exception); + throw new UnsupportedOperationException(); Review Comment: Shouldn't this still call the other deprecated method? Imagine a user implemented ```java handle(final ProducerRecord<byte[], byte[]> record, final Exception exception) ``` but not ```java ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord<byte[], byte[]> record, final Exception exception) ``` Streams would throw an `UnsupportedOperationException` although it did not before upgrading to this version. ########## streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java: ########## @@ -147,10 +198,174 @@ enum ProductionExceptionHandlerResponse { } } + /** + * Enumeration that describes the response from the exception handler. + */ + enum Result { + /** Resume processing. + * + * <p> For this case, output records which could not be written successfully are lost. + * Use this option only if you can tolerate data loss. + */ + RESUME(0, "RESUME"), + /** Fail processing. + * + * <p> Kafka Streams will raise an exception and the {@code StreamsThread} will fail. + * No offsets (for {@link org.apache.kafka.streams.StreamsConfig#AT_LEAST_ONCE at-least-once}) or transactions + * (for {@link org.apache.kafka.streams.StreamsConfig#EXACTLY_ONCE_V2 exactly-once}) will be committed. + */ + FAIL(1, "FAIL"), + /** Retry the failed operation. + * + * <p> Retrying might imply that a {@link TaskCorruptedException} exception is thrown, and that the retry + * is started from the last committed offset. + * + * <p> <b>NOTE:</b> {@code RETRY} is only a valid return value for + * {@link org.apache.kafka.common.errors.RetriableException retriable exceptions}. + * If {@code RETRY} is returned for a non-retriable exception it will be interpreted as {@link #FAIL}. + */ + RETRY(2, "RETRY"); + + /** + * An english description for the used option. This is for debugging only and may change. + */ + public final String name; + + /** + * The permanent and immutable id for the used option. This can't change ever. + */ + public final int id; + + Result(final int id, final String name) { + this.id = id; + this.name = name; + } + + /** + * Converts the deprecated enum ProductionExceptionHandlerResponse into the new Result enum. + * + * @param value the old ProductionExceptionHandlerResponse enum value + * @return a {@link ProductionExceptionHandler.Result} enum value + * @throws IllegalArgumentException if the provided value does not map to a valid {@link ProductionExceptionHandler.Result} + */ + private static ProductionExceptionHandler.Result from(final ProductionExceptionHandlerResponse value) { + switch (value) { + case FAIL: + return Result.FAIL; + case CONTINUE: + return Result.RESUME; + case RETRY: + return Result.RETRY; + default: + throw new IllegalArgumentException("No Result enum found for old value: " + value); + } + } + } + enum SerializationExceptionOrigin { /** Serialization exception occurred during serialization of the key. */ KEY, /** Serialization exception occurred during serialization of the value. */ VALUE } + + /** + * Represents the result of handling a production exception. + * <p> + * The {@code Response} class encapsulates a {@link ProductionExceptionHandlerResponse}, Review Comment: ```suggestion * The {@code Response} class encapsulates a {@link Result}, ``` ########## streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java: ########## @@ -147,10 +198,174 @@ enum ProductionExceptionHandlerResponse { } } + /** + * Enumeration that describes the response from the exception handler. + */ + enum Result { + /** Resume processing. + * + * <p> For this case, output records which could not be written successfully are lost. + * Use this option only if you can tolerate data loss. + */ + RESUME(0, "RESUME"), + /** Fail processing. + * + * <p> Kafka Streams will raise an exception and the {@code StreamsThread} will fail. + * No offsets (for {@link org.apache.kafka.streams.StreamsConfig#AT_LEAST_ONCE at-least-once}) or transactions + * (for {@link org.apache.kafka.streams.StreamsConfig#EXACTLY_ONCE_V2 exactly-once}) will be committed. + */ + FAIL(1, "FAIL"), + /** Retry the failed operation. + * + * <p> Retrying might imply that a {@link TaskCorruptedException} exception is thrown, and that the retry + * is started from the last committed offset. + * + * <p> <b>NOTE:</b> {@code RETRY} is only a valid return value for + * {@link org.apache.kafka.common.errors.RetriableException retriable exceptions}. + * If {@code RETRY} is returned for a non-retriable exception it will be interpreted as {@link #FAIL}. + */ + RETRY(2, "RETRY"); + + /** + * An english description for the used option. This is for debugging only and may change. + */ + public final String name; + + /** + * The permanent and immutable id for the used option. This can't change ever. + */ + public final int id; + + Result(final int id, final String name) { + this.id = id; + this.name = name; + } + + /** + * Converts the deprecated enum ProductionExceptionHandlerResponse into the new Result enum. + * + * @param value the old ProductionExceptionHandlerResponse enum value + * @return a {@link ProductionExceptionHandler.Result} enum value + * @throws IllegalArgumentException if the provided value does not map to a valid {@link ProductionExceptionHandler.Result} + */ + private static ProductionExceptionHandler.Result from(final ProductionExceptionHandlerResponse value) { + switch (value) { + case FAIL: + return Result.FAIL; + case CONTINUE: + return Result.RESUME; + case RETRY: + return Result.RETRY; + default: + throw new IllegalArgumentException("No Result enum found for old value: " + value); + } + } + } + enum SerializationExceptionOrigin { /** Serialization exception occurred during serialization of the key. */ KEY, /** Serialization exception occurred during serialization of the value. */ VALUE } + + /** + * Represents the result of handling a production exception. + * <p> + * The {@code Response} class encapsulates a {@link ProductionExceptionHandlerResponse}, + * indicating whether processing should continue or fail, along with an optional list of + * {@link ProducerRecord} instances to be sent to a dead letter queue. + * </p> + */ + class Response { Review Comment: Could you please add some unit tests for this class? ########## streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java: ########## @@ -45,8 +51,26 @@ public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final return ProcessingHandlerResponse.FAIL; } + @Override + public Response handleError(final ErrorHandlerContext context, + final Record<?, ?> record, + final Exception exception) { + log.warn( Review Comment: This should be `log.error()` I believe. ########## streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java: ########## @@ -18,35 +18,72 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.streams.StreamsConfig; import java.util.Map; +import static org.apache.kafka.streams.errors.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords; + /** * {@code ProductionExceptionHandler} that always instructs streams to fail when an exception * happens while attempting to produce result records. */ public class DefaultProductionExceptionHandler implements ProductionExceptionHandler { + private String deadLetterQueueTopic = null; + @SuppressWarnings("deprecation") @Deprecated @Override public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record, final Exception exception) { return exception instanceof RetriableException ? - ProductionExceptionHandlerResponse.RETRY : - ProductionExceptionHandlerResponse.FAIL; + ProductionExceptionHandler.ProductionExceptionHandlerResponse.RETRY : + ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL; } + @SuppressWarnings("deprecation") + @Deprecated @Override public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord<byte[], byte[]> record, final Exception exception) { return exception instanceof RetriableException ? - ProductionExceptionHandlerResponse.RETRY : - ProductionExceptionHandlerResponse.FAIL; + ProductionExceptionHandler.ProductionExceptionHandlerResponse.RETRY : + ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL; + } Review Comment: Those handler methods are not called anymore. You can remove them. ########## streams/src/main/java/org/apache/kafka/streams/errors/ExceptionHandlerUtils.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.errors; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.List; + +/** + * {@code CommonExceptionHandler} Contains utilities method that could be used by all exception handlers + */ +class ExceptionHandlerUtils { Review Comment: If this can be used by all exception handlers, why is it then package private and why is it not in the KIP? Since this is not in an internal package it will show up in the javadocs which makes it a public API. Either you make it public and add it to the KIP or you move it to an internal package. The latter means that it can be used by the internal handlers but not by external ones (actually it can be used but with guarantees for backward compatibility). ########## streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java: ########## @@ -95,4 +118,137 @@ enum DeserializationHandlerResponse { } } + /** + * Enumeration that describes the response from the exception handler. + */ + enum Result { + /** Continue processing. */ + RESUME(0, "RESUME"), + /** Fail processing. */ + FAIL(1, "FAIL"); + + /** + * An english description for the used option. This is for debugging only and may change. + */ + public final String name; + + /** + * The permanent and immutable id for the used option. This can't change ever. + */ + public final int id; + + Result(final int id, final String name) { + this.id = id; + this.name = name; + } + + /** + * Converts the deprecated enum DeserializationHandlerResponse into the new Result enum. + * + * @param value the old DeserializationHandlerResponse enum value + * @return a {@link Result} enum value + * @throws IllegalArgumentException if the provided value does not map to a valid {@link Result} + */ + private static DeserializationExceptionHandler.Result from(final DeserializationHandlerResponse value) { + switch (value) { + case FAIL: + return Result.FAIL; + case CONTINUE: + return Result.RESUME; + default: + throw new IllegalArgumentException("No Result enum found for old value: " + value); + } + } + } + + /** + * Represents the result of handling a deserialization exception. + * <p> + * The {@code Response} class encapsulates a {@link ProcessingExceptionHandler.Result}, + * indicating whether processing should continue or fail, along with an optional list of + * {@link ProducerRecord} instances to be sent to a dead letter queue. + * </p> + */ + class Response { + + private Result result; + + private List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords; + + /** + * Constructs a new {@code DeserializationExceptionResponse} object. + * + * @param result the result indicating whether processing should continue or fail; + * must not be {@code null}. + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. + */ + private Response(final Result result, + final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) { + this.result = result; + this.deadLetterQueueRecords = deadLetterQueueRecords; + } + + /** + * Creates a {@code Response} indicating that processing should fail. + * + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. + * @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#FAIL} status. + */ + public static Response fail(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) { + return new Response(Result.FAIL, deadLetterQueueRecords); + } + + /** + * Creates a {@code Response} indicating that processing should fail. + * + * @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#FAIL} status. + */ + public static Response fail() { + return fail(Collections.emptyList()); + } + + /** + * Creates a {@code Response} indicating that processing should continue. + * + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. + * @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#RESUME} status. + */ + public static Response resume(final List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords) { + return new Response(Result.RESUME, deadLetterQueueRecords); + } + + /** + * Creates a {@code Response} indicating that processing should continue. + * + * @return a {@code Response} with a {@link DeserializationHandlerResponse#CONTINUE} status. Review Comment: ```suggestion * @return a {@code Response} with a {@link DeserializationExceptionHandler.Result#RESUME} status. ``` ########## streams/src/main/java/org/apache/kafka/streams/errors/ExceptionHandlerUtils.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.errors; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.List; + +/** + * {@code CommonExceptionHandler} Contains utilities method that could be used by all exception handlers + */ +class ExceptionHandlerUtils { + static final String HEADER_ERRORS_EXCEPTION_NAME = "__streams.errors.exception"; + static final String HEADER_ERRORS_STACKTRACE_NAME = "__streams.errors.stacktrace"; + static final String HEADER_ERRORS_EXCEPTION_MESSAGE_NAME = "__streams.errors.message"; + static final String HEADER_ERRORS_TOPIC_NAME = "__streams.errors.topic"; + static final String HEADER_ERRORS_PARTITION_NAME = "__streams.errors.partition"; + static final String HEADER_ERRORS_OFFSET_NAME = "__streams.errors.offset"; + + + static boolean shouldBuildDeadLetterQueueRecord(final String deadLetterQueueTopicName) { + return deadLetterQueueTopicName != null; + } + + /** + * If required, return Dead Letter Queue records for the provided exception + * @param key Serialized key for the records + * @param value Serialized value for the records + * @param context ErrorHandlerContext of the exception + * @param exception Thrown exception + * @return A list of Dead Letter Queue records to produce + */ + static List<ProducerRecord<byte[], byte[]>> maybeBuildDeadLetterQueueRecords(final String deadLetterQueueTopicName, + final byte[] key, + final byte[] value, + final ErrorHandlerContext context, + final Exception exception) { + if (!shouldBuildDeadLetterQueueRecord(deadLetterQueueTopicName)) { + return Collections.emptyList(); + } + + return Collections.singletonList(buildDeadLetterQueueRecord(deadLetterQueueTopicName, key, value, context, exception)); + } + + + /** + * Build Dead Letter Queue records for the provided exception + * @param key Serialized key for the records + * @param value Serialized value for the records + * @param context ErrorHandlerContext of the exception + * @return A list of Dead Letter Queue records to produce + */ + static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final String deadLetterQueueTopicName, + final byte[] key, + final byte[] value, + final ErrorHandlerContext context, + final Exception e) { Review Comment: ```suggestion static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final String deadLetterQueueTopicName, final byte[] key, final byte[] value, final ErrorHandlerContext context, final Exception e) { ``` ########## streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java: ########## @@ -18,35 +18,72 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.streams.StreamsConfig; import java.util.Map; +import static org.apache.kafka.streams.errors.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords; + /** * {@code ProductionExceptionHandler} that always instructs streams to fail when an exception * happens while attempting to produce result records. */ public class DefaultProductionExceptionHandler implements ProductionExceptionHandler { + private String deadLetterQueueTopic = null; + @SuppressWarnings("deprecation") @Deprecated @Override public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record, final Exception exception) { return exception instanceof RetriableException ? - ProductionExceptionHandlerResponse.RETRY : - ProductionExceptionHandlerResponse.FAIL; + ProductionExceptionHandler.ProductionExceptionHandlerResponse.RETRY : + ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL; } + @SuppressWarnings("deprecation") + @Deprecated @Override public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord<byte[], byte[]> record, final Exception exception) { return exception instanceof RetriableException ? - ProductionExceptionHandlerResponse.RETRY : - ProductionExceptionHandlerResponse.FAIL; + ProductionExceptionHandler.ProductionExceptionHandlerResponse.RETRY : + ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL; + } + + @Override + public Response handleError(final ErrorHandlerContext context, + final ProducerRecord<byte[], byte[]> record, + final Exception exception) { + return exception instanceof RetriableException ? + Response.retry() : + Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, null, null, context, exception)); + } + + + @SuppressWarnings("deprecation") + @Deprecated + @Override + public ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context, + final ProducerRecord record, + final Exception exception, + final SerializationExceptionOrigin origin) { + return ProductionExceptionHandler.ProductionExceptionHandlerResponse.FAIL; } Review Comment: Why do you need this? Isn't this equivalent to the default implementation of the interface? Additionally, this handler method is not called in Kafka Streams anymore. You can remove it. ########## streams/src/main/java/org/apache/kafka/streams/errors/ExceptionHandlerUtils.java: ########## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.errors; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsConfig; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.List; + +/** + * {@code CommonExceptionHandler} Contains utilities method that could be used by all exception handlers + */ +class ExceptionHandlerUtils { + static final String HEADER_ERRORS_EXCEPTION_NAME = "__streams.errors.exception"; + static final String HEADER_ERRORS_STACKTRACE_NAME = "__streams.errors.stacktrace"; + static final String HEADER_ERRORS_EXCEPTION_MESSAGE_NAME = "__streams.errors.message"; + static final String HEADER_ERRORS_TOPIC_NAME = "__streams.errors.topic"; + static final String HEADER_ERRORS_PARTITION_NAME = "__streams.errors.partition"; + static final String HEADER_ERRORS_OFFSET_NAME = "__streams.errors.offset"; + + + static boolean shouldBuildDeadLetterQueueRecord(final String deadLetterQueueTopicName) { + return deadLetterQueueTopicName != null; + } + + /** + * If required, return Dead Letter Queue records for the provided exception + * @param key Serialized key for the records + * @param value Serialized value for the records + * @param context ErrorHandlerContext of the exception + * @param exception Thrown exception + * @return A list of Dead Letter Queue records to produce + */ + static List<ProducerRecord<byte[], byte[]>> maybeBuildDeadLetterQueueRecords(final String deadLetterQueueTopicName, + final byte[] key, + final byte[] value, + final ErrorHandlerContext context, + final Exception exception) { + if (!shouldBuildDeadLetterQueueRecord(deadLetterQueueTopicName)) { + return Collections.emptyList(); + } + + return Collections.singletonList(buildDeadLetterQueueRecord(deadLetterQueueTopicName, key, value, context, exception)); + } + + + /** + * Build Dead Letter Queue records for the provided exception + * @param key Serialized key for the records + * @param value Serialized value for the records + * @param context ErrorHandlerContext of the exception + * @return A list of Dead Letter Queue records to produce + */ Review Comment: ```suggestion /** * Build dead letter queue record for the provided exception. * * @param key Serialized key for the record. * @param value Serialized value for the record. * @param context error handler context of the exception. * @return A dead letter queue record to produce. */ ``` ########## streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java: ########## @@ -50,6 +54,8 @@ public DeserializationHandlerResponse handle(final ProcessorContext context, return DeserializationHandlerResponse.CONTINUE; } + @SuppressWarnings("deprecation") + @Deprecated @Override public DeserializationHandlerResponse handle(final ErrorHandlerContext context, Review Comment: You can remove those deprecated handler methods. They are not called anywhere in the Streams code. ########## streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java: ########## @@ -50,6 +54,8 @@ public DeserializationHandlerResponse handle(final ProcessorContext context, return DeserializationHandlerResponse.FAIL; } + @SuppressWarnings("deprecation") + @Deprecated @Override public DeserializationHandlerResponse handle(final ErrorHandlerContext context, Review Comment: You can remove those deprecated handler methods. They are not called anywhere in the Streams code. ########## streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailExceptionHandler.java: ########## @@ -67,8 +73,25 @@ public DeserializationHandlerResponse handle(final ErrorHandlerContext context, return DeserializationHandlerResponse.FAIL; } + @Override + public Response handleError(final ErrorHandlerContext context, + final ConsumerRecord<byte[], byte[]> record, + final Exception exception) { + log.warn( Review Comment: This should be `log.error()`. It is failing, not resuming. ########## streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProcessingExceptionHandler.java: ########## @@ -16,22 +16,28 @@ */ package org.apache.kafka.streams.errors; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.api.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import static org.apache.kafka.streams.errors.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords; + /** * Processing exception handler that logs a processing exception and then * signals the processing pipeline to stop processing more records and fail. */ public class LogAndFailProcessingExceptionHandler implements ProcessingExceptionHandler { private static final Logger log = LoggerFactory.getLogger(LogAndFailProcessingExceptionHandler.class); + private String deadLetterQueueTopic = null; + @Deprecated @Override - public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { + public ProcessingHandlerResponse handle(final ErrorHandlerContext context, + final Record<?, ?> record, final Exception exception) { Review Comment: You can remove this deprecated handler method. It is not called anywhere in the Streams code. ########## streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java: ########## @@ -55,11 +58,32 @@ default ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], b * The exception that occurred during production. * * @return Whether to continue or stop processing, or retry the failed operation. + * @deprecated Use {@link #handleError(ErrorHandlerContext, ProducerRecord, Exception)} instead. */ + @Deprecated default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, final ProducerRecord<byte[], byte[]> record, final Exception exception) { - return handle(record, exception); + throw new UnsupportedOperationException(); Review Comment: Maybe it would be beneficial to add some unit tests that verify this redirection. With such unit tests, this removal would had happened without some thoughts about why the test failed. ########## streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.java: ########## @@ -16,22 +16,29 @@ */ package org.apache.kafka.streams.errors; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.api.Record; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import static org.apache.kafka.streams.errors.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords; + /** * Processing exception handler that logs a processing exception and then * signals the processing pipeline to continue processing more records. */ public class LogAndContinueProcessingExceptionHandler implements ProcessingExceptionHandler { private static final Logger log = LoggerFactory.getLogger(LogAndContinueProcessingExceptionHandler.class); + private String deadLetterQueueTopic = null; + @Deprecated @Override - public ProcessingHandlerResponse handle(final ErrorHandlerContext context, final Record<?, ?> record, final Exception exception) { + public ProcessingHandlerResponse handle(final ErrorHandlerContext context, Review Comment: You can remove this deprecated handler method. It is not called anywhere in the Streams code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
