Ankur Sinha created KAFKA-20604:
-----------------------------------

             Summary: KIP-1348: Complete Built-in Exception Handler Symmetry 
with LogAndContinueProductionExceptionHandler
                 Key: KAFKA-20604
                 URL: https://issues.apache.org/jira/browse/KAFKA-20604
             Project: Kafka
          Issue Type: New Feature
          Components: streams
            Reporter: Ankur Sinha


This ticket tracks the implementation of 
[KIP-1348|https://cwiki.apache.org/confluence/display/KAFKA/KIP-1348%3A+Complete+Built-in+Exception+Handler+Symmetry+with+LogAndContinueProductionExceptionHandler].

h3. Problem

Kafka Streams ships built-in log-and-continue exception handlers for 
deserialization ({{LogAndContinueExceptionHandler}}) and processing 
({{LogAndContinueProcessingExceptionHandler}}), but not for 
production/serialization errors. Users who want to skip poison-pill records on 
the output side must write a custom {{ProductionExceptionHandler}} — something 
not required for the other two error types.

Additionally, DLQ records produced by any exception handler are currently sent 
through the same {{RecordCollectorImpl.send()}} path as regular records. With a 
continue handler configured, this creates:
* An *infinite loop* in the production path (DLQ failure triggers handler 
again, which produces another DLQ record, etc.)
* *Silent data loss* in the deserialization/processing paths (DLQ failure is 
swallowed by the continue handler, original record is lost)

h3. Changes

This KIP adds:

# *{{LogAndContinueProductionExceptionHandler}}* — new built-in handler that 
logs at WARN level and returns {{Response.resume()}}. Supports DLQ via 
{{errors.dead.letter.queue.topic.name}}.
# *{{LogAndFailProductionExceptionHandler}}* — rename of 
{{DefaultProductionExceptionHandler}} for naming consistency. 
{{DefaultProductionExceptionHandler}} is deprecated as a subclass alias.
# *{{sendDlqRecord()}}* method on {{RecordCollector}} interface — dedicated DLQ 
send path that bypasses the production exception handler, preventing the 
infinite loop and silent data loss described above. All four DLQ call sites 
({{RecordCollectorImpl.handleException()}}, 
{{RecordCollectorImpl.recordSendError()}}, 
{{RecordDeserializer.handleDeserializationFailure()}}, and {{StreamTask}} 
processing error handler) are updated to use it.

h3. Configuration

No new configuration keys. Users opt in via existing configs:
{code}
production.exception.handler=org.apache.kafka.streams.errors.LogAndContinueProductionExceptionHandler
errors.dead.letter.queue.topic.name=my-app-dlq
{code}

h3. Compatibility

Fully backward compatible. Default behavior unchanged. 
{{DefaultProductionExceptionHandler}} remains as a deprecated alias. The 
internal default class reference in {{StreamsConfig}} is updated from 
{{DefaultProductionExceptionHandler}} to 
{{LogAndFailProductionExceptionHandler}} to avoid {{-Werror}} deprecation 
compile errors; runtime behavior is identical.

h3. Files Changed

*New files:*
* 
{{streams/src/main/java/org/apache/kafka/streams/errors/LogAndContinueProductionExceptionHandler.java}}
* 
{{streams/src/main/java/org/apache/kafka/streams/errors/LogAndFailProductionExceptionHandler.java}}

*Modified source:*
* 
{{streams/src/main/java/org/apache/kafka/streams/errors/DefaultProductionExceptionHandler.java}}
 — deprecated, extends LogAndFail
* {{streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java}} — 
default class reference updated
* 
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java}}
 — added {{sendDlqRecord()}}
* 
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java}}
 — implemented {{sendDlqRecord()}}, updated DLQ send in {{handleException()}} 
and {{recordSendError()}}
* 
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java}}
 — DLQ send updated to {{sendDlqRecord()}}
* 
{{streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java}}
 — DLQ send updated to {{sendDlqRecord()}}

*Modified tests:*
* 
{{streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java}}
 — renamed existing DLQ tests, added 2 new LogAndContinue DLQ tests
* {{streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java}} — 
added default handler test
* {{streams/src/test/java/org/apache/kafka/test/MockRecordCollector.java}} — 
added {{sendDlqRecord()}} mock

*Modified docs:*
* {{docs/streams/upgrade-guide.md}} — added 4.4.0 section
* {{docs/streams/developer-guide/config-streams.md}} — updated handler 
references



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to