lucasbru commented on code in PR #17942: URL: https://github.com/apache/kafka/pull/17942#discussion_r2200535462
########## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ########## @@ -619,6 +619,10 @@ public class StreamsConfig extends AbstractConfig { "support \"classic\" or \"streams\". If \"streams\" is specified, then the streams rebalance protocol will be " + "used. Otherwise, the classic group protocol will be used."; + public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.dead.letter.queue.topic.name"; + private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "If not null, the default exception handler will build and send a Dead Letter Queue record in the provided topic name if an error occurs.\n" + Review Comment: ```suggestion private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "If not null, the default exception handler will build and send a Dead Letter Queue record to the topic with the provided name if an error occurs.\n" + ``` ########## streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java: ########## @@ -18,38 +18,42 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.streams.StreamsConfig; import java.util.Map; +import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords; + /** * {@code ProductionExceptionHandler} that always instructs streams to fail when an exception * happens while attempting to produce result records. */ public class DefaultProductionExceptionHandler implements ProductionExceptionHandler { - /** - * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead. - */ - @SuppressWarnings("deprecation") - @Deprecated + + private String deadLetterQueueTopic = null; + @Override - public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record, - final Exception exception) { + public Response handleError(final ErrorHandlerContext context, + final ProducerRecord<byte[], byte[]> record, + final Exception exception) { return exception instanceof RetriableException ? - ProductionExceptionHandlerResponse.RETRY : - ProductionExceptionHandlerResponse.FAIL; + Response.retry() : + Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawKey(), context, exception)); } + @SuppressWarnings("rawtypes") @Override - public ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context, - final ProducerRecord<byte[], byte[]> record, - final Exception exception) { - return exception instanceof RetriableException ? - ProductionExceptionHandlerResponse.RETRY : - ProductionExceptionHandlerResponse.FAIL; + public Response handleSerializationError(final ErrorHandlerContext context, + final ProducerRecord record, Review Comment: for my understanding, why do we need the rawtype here? ########## streams/src/main/java/org/apache/kafka/streams/errors/DeserializationExceptionHandler.java: ########## @@ -95,4 +118,137 @@ enum DeserializationHandlerResponse { } } + /** + * Enumeration that describes the response from the exception handler. + */ + enum Result { + /** Continue processing. */ + RESUME(0, "RESUME"), + /** Fail processing. */ + FAIL(1, "FAIL"); + + /** + * An english description for the used option. This is for debugging only and may change. + */ + public final String name; + + /** + * The permanent and immutable id for the used option. This can't change ever. + */ + public final int id; + + Result(final int id, final String name) { + this.id = id; + this.name = name; + } + + /** + * Converts the deprecated enum DeserializationHandlerResponse into the new Result enum. + * + * @param value the old DeserializationHandlerResponse enum value + * @return a {@link Result} enum value + * @throws IllegalArgumentException if the provided value does not map to a valid {@link Result} + */ + private static DeserializationExceptionHandler.Result from(final DeserializationHandlerResponse value) { + switch (value) { + case FAIL: + return Result.FAIL; + case CONTINUE: + return Result.RESUME; + default: + throw new IllegalArgumentException("No Result enum found for old value: " + value); + } + } + } + + /** + * Represents the result of handling a deserialization exception. + * <p> + * The {@code Response} class encapsulates a {@link Result}, + * indicating whether processing should continue or fail, along with an optional list of + * {@link ProducerRecord} instances to be sent to a dead letter queue. + * </p> + */ + class Response { + + private Result result; + + private List<ProducerRecord<byte[], byte[]>> deadLetterQueueRecords; + + /** + * Constructs a new {@code DeserializationExceptionResponse} object. + * + * @param result the result indicating whether processing should continue or fail; + * must not be {@code null}. + * @param deadLetterQueueRecords the list of records to be sent to the dead letter queue; may be {@code null}. Review Comment: The `@param` list seems misaligned ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java: ########## @@ -462,15 +483,15 @@ private void recordSendError(final String topic, // TransactionAbortedException is only thrown after `abortTransaction()` was called, // so it's only a followup error, and Kafka Streams is already handling the original error } else { - final ProductionExceptionHandlerResponse response; + final ProductionExceptionHandler.Response response; try { response = Objects.requireNonNull( - productionExceptionHandler.handle( + productionExceptionHandler.handleError( errorHandlerContext(context, processorNodeId), serializedRecord, productionException ), - "Invalid ProductionExceptionHandler response." + "Invalid ProductionExceptionResponse response." Review Comment: ```suggestion "Invalid ProductionExceptionHandler response." ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java: ########## @@ -220,11 +221,11 @@ public void process(final Record<KIn, VIn> record) { internalProcessorContext.recordContext().sourceRawValue() ); - final ProcessingExceptionHandler.ProcessingHandlerResponse response; + final ProcessingExceptionHandler.Response processingExceptionResponse; try { - response = Objects.requireNonNull( - processingExceptionHandler.handle(errorHandlerContext, record, processingException), - "Invalid ProductionExceptionHandler response." + processingExceptionResponse = Objects.requireNonNull( Review Comment: nit: response was a fine variable name ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ########## @@ -1762,7 +1764,7 @@ public void shouldThrowStreamsExceptionOnSubsequentFlushIfASendFailsAndProductio final StreamsException thrown = assertThrows(StreamsException.class, collector::flush); assertEquals("Fatal user code error in production error callback", thrown.getMessage()); assertInstanceOf(NullPointerException.class, thrown.getCause()); - assertEquals("Invalid ProductionExceptionHandler response.", thrown.getCause().getMessage()); + assertEquals("Invalid ProductionExceptionResponse response.", thrown.getCause().getMessage()); Review Comment: ```suggestion assertEquals("Invalid ProductionExceptionHandler response.", thrown.getCause().getMessage()); ``` ########## streams/src/main/java/org/apache/kafka/streams/errors/internals/ExceptionHandlerUtils.java: ########## @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.errors.internals; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.errors.InvalidConfigurationException; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.ErrorHandlerContext; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.Collections; +import java.util.List; + +/** + * {@code ExceptionHandlerUtils} Contains utilities method that could be used by all exception handlers + */ +public class ExceptionHandlerUtils { + public static final String HEADER_ERRORS_EXCEPTION_NAME = "__streams.errors.exception"; + public static final String HEADER_ERRORS_STACKTRACE_NAME = "__streams.errors.stacktrace"; + public static final String HEADER_ERRORS_EXCEPTION_MESSAGE_NAME = "__streams.errors.message"; + public static final String HEADER_ERRORS_TOPIC_NAME = "__streams.errors.topic"; + public static final String HEADER_ERRORS_PARTITION_NAME = "__streams.errors.partition"; + public static final String HEADER_ERRORS_OFFSET_NAME = "__streams.errors.offset"; + + + public static boolean shouldBuildDeadLetterQueueRecord(final String deadLetterQueueTopicName) { + return deadLetterQueueTopicName != null; + } + + /** + * If required, return Dead Letter Queue records for the provided exception + * + * @param key Serialized key for the records + * @param value Serialized value for the records + * @param context ErrorHandlerContext of the exception + * @param exception Thrown exception + * @return A list of Dead Letter Queue records to produce + */ + public static List<ProducerRecord<byte[], byte[]>> maybeBuildDeadLetterQueueRecords(final String deadLetterQueueTopicName, + final byte[] key, + final byte[] value, + final ErrorHandlerContext context, + final Exception exception) { + if (!shouldBuildDeadLetterQueueRecord(deadLetterQueueTopicName)) { + return Collections.emptyList(); + } + + return Collections.singletonList(buildDeadLetterQueueRecord(deadLetterQueueTopicName, key, value, context, exception)); + } + + + /** + * Build dead letter queue record for the provided exception. + * + * @param key Serialized key for the record. + * @param value Serialized value for the record. + * @param context error handler context of the exception. + * @return A dead letter queue record to produce. + */ + public static ProducerRecord<byte[], byte[]> buildDeadLetterQueueRecord(final String deadLetterQueueTopicName, + final byte[] key, + final byte[] value, + final ErrorHandlerContext context, + final Exception e) { + if (deadLetterQueueTopicName == null) { + throw new InvalidConfigurationException(String.format("%s cannot be null while building dead letter queue record", StreamsConfig.ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG)); + } + final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(deadLetterQueueTopicName, null, context.timestamp(), key, value); + final StringWriter stackStraceStringWriter = new StringWriter(); + final PrintWriter stackTracePrintWriter = new PrintWriter(stackStraceStringWriter); + e.printStackTrace(stackTracePrintWriter); + + try (final StringSerializer stringSerializer = new StringSerializer()) { + producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_NAME, stringSerializer.serialize(null, e.toString())); + producerRecord.headers().add(HEADER_ERRORS_EXCEPTION_MESSAGE_NAME, stringSerializer.serialize(null, e.getMessage())); + producerRecord.headers().add(HEADER_ERRORS_STACKTRACE_NAME, stringSerializer.serialize(null, stackStraceStringWriter.toString())); Review Comment: Looks valid @Dabz ########## streams/src/test/java/org/apache/kafka/streams/errors/ExceptionHandlerUtilsTest.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.errors; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.header.internals.RecordHeader; +import org.apache.kafka.common.header.internals.RecordHeaders; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.errors.internals.DefaultErrorHandlerContext; +import org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils; +import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; +import org.apache.kafka.streams.state.StateSerdes; +import org.apache.kafka.test.InternalMockProcessorContext; +import org.apache.kafka.test.MockRecordCollector; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.util.Collections; +import java.util.Iterator; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + + +@ExtendWith(MockitoExtension.class) +public class ExceptionHandlerUtilsTest { + @Test + public void checkDealLetterQueueRecords() { Review Comment: Looks valid @Dabz ########## streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/ProcessingExceptionHandlerIntegrationTest.java: ########## @@ -357,7 +357,7 @@ public void shouldStopProcessingWhenProcessingExceptionHandlerReturnsNull() { final StreamsException e = assertThrows(StreamsException.class, () -> inputTopic.pipeInput(eventError.key, eventError.value, Instant.EPOCH)); assertEquals("Fatal user code error in processing error callback", e.getMessage()); assertInstanceOf(NullPointerException.class, e.getCause()); - assertEquals("Invalid ProductionExceptionHandler response.", e.getCause().getMessage()); + assertEquals("Invalid ProcessingExceptionResponse response.", e.getCause().getMessage()); Review Comment: I would drop the second "response" here. Below, it's now just called `Response`. so, is this correct? ########## streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueExceptionHandler.java: ########## @@ -17,47 +17,27 @@ package org.apache.kafka.streams.errors; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.StreamsConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Map; +import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords; + /** * Deserialization handler that logs a deserialization exception and then * signals the processing pipeline to continue processing more records. */ public class LogAndContinueExceptionHandler implements DeserializationExceptionHandler { private static final Logger log = LoggerFactory.getLogger(LogAndContinueExceptionHandler.class); + private String deadLetterQueueTopic = null; - /** - * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ConsumerRecord, Exception)} instead. - */ - @SuppressWarnings("deprecation") - @Deprecated @Override - public DeserializationHandlerResponse handle(final ProcessorContext context, Review Comment: For my understanding - Why can we just remove this? Don't we have to keep it around for compatibility? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java: ########## @@ -329,22 +337,22 @@ private <K, V> void handleException(final ProductionExceptionHandler.Serializati final Integer partition, final Long timestamp, final String processorNodeId, - final InternalProcessorContext<Void, Void> context, + final InternalProcessorContext<?, ?> context, final Exception serializationException) { log.debug(String.format("Error serializing record for topic %s", topic), serializationException); final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers); - final ProductionExceptionHandlerResponse response; + final ProductionExceptionHandler.Response response; try { response = Objects.requireNonNull( - productionExceptionHandler.handleSerializationException( + productionExceptionHandler.handleSerializationError( errorHandlerContext(context, processorNodeId), record, serializationException, origin ), - "Invalid ProductionExceptionHandler response." + "Invalid ProductionExceptionResponse response." Review Comment: ```suggestion "Invalid ProductionExceptionHandler response." ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java: ########## @@ -1713,7 +1715,7 @@ public void shouldThrowStreamsExceptionWhenSerializationFailedAndProductionExcep assertEquals("Fatal user code error in production error callback", exception.getMessage()); assertInstanceOf(NullPointerException.class, exception.getCause()); - assertEquals("Invalid ProductionExceptionHandler response.", exception.getCause().getMessage()); + assertEquals("Invalid ProductionExceptionResponse response.", exception.getCause().getMessage()); Review Comment: ```suggestion assertEquals("Invalid ProductionExceptionHandler response.", exception.getCause().getMessage()); ``` ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java: ########## @@ -220,11 +221,11 @@ public void process(final Record<KIn, VIn> record) { internalProcessorContext.recordContext().sourceRawValue() ); - final ProcessingExceptionHandler.ProcessingHandlerResponse response; + final ProcessingExceptionHandler.Response processingExceptionResponse; try { - response = Objects.requireNonNull( - processingExceptionHandler.handle(errorHandlerContext, record, processingException), - "Invalid ProductionExceptionHandler response." + processingExceptionResponse = Objects.requireNonNull( + processingExceptionHandler.handleError(errorHandlerContext, record, processingException), + "Invalid ProcessingExceptionResponse response." Review Comment: It's now called `Response`. ```suggestion "Invalid ProcessingExceptionHandler response." ``` ########## streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java: ########## @@ -18,38 +18,42 @@ import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.errors.RetriableException; +import org.apache.kafka.streams.StreamsConfig; import java.util.Map; +import static org.apache.kafka.streams.errors.internals.ExceptionHandlerUtils.maybeBuildDeadLetterQueueRecords; + /** * {@code ProductionExceptionHandler} that always instructs streams to fail when an exception * happens while attempting to produce result records. */ public class DefaultProductionExceptionHandler implements ProductionExceptionHandler { - /** - * @deprecated Since 3.9. Use {@link #handle(ErrorHandlerContext, ProducerRecord, Exception)} instead. - */ - @SuppressWarnings("deprecation") - @Deprecated + + private String deadLetterQueueTopic = null; + @Override - public ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record, - final Exception exception) { + public Response handleError(final ErrorHandlerContext context, + final ProducerRecord<byte[], byte[]> record, + final Exception exception) { return exception instanceof RetriableException ? - ProductionExceptionHandlerResponse.RETRY : - ProductionExceptionHandlerResponse.FAIL; + Response.retry() : + Response.fail(maybeBuildDeadLetterQueueRecords(deadLetterQueueTopic, context.sourceRawKey(), context.sourceRawKey(), context, exception)); Review Comment: please check this @Dabz -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org