Hi Arpit, Thanks for picking this up. A few questions before this goes to VOTE.
Nit: the discussion thread link on the KIP is incorrect, can you fix it? LB1: How does this interact with EOS? Under EOS, the normal DLQ write is part of the same transaction as the failing record's offset commit (via StreamsProducer.maybeBeginTransaction()). The global thread has no consumer-group offset commit, and the checkpoint file sits outside any Kafka transaction. Do we run the global producer at-least-once even under EOS, or do we wrap DLQ sends in a transaction? If the latter, how do we handle ordering against maybeCheckpoint(), the new transactional.id, and fencing semantics that don't exist for the global thread today? LB2: What does the producer config look like - in particular client.id and transactional.id? How do we avoid colliding with the per-thread task producers under EOS? LB3: What's the shutdown ordering with the new producer, and what happens if sendException fires (e.g. DLQ topic auth failure)? LB4: processing.exception.handler.global.enabled is already deprecated. Is the semantic change from "drop with warning" to "produce to DLQ" called out in Compatibility, and does the deprecation javadoc still reflect what the config does? LB5: How does this interact with the deserialization handler path? RecordDeserializer.handleDeserializationFailure casts the context to RecordCollector.Supplier unconditionally, and GlobalProcessorContextImpl isn't one today - so a deserialization handler returning DLQ records during global-state restoration currently hits a ClassCastException rather than warn-and-drop. Does the KIP intend to cover this case too (it seems to fall out for free once GlobalProcessorContextImpl becomes a RecordCollector.Supplier)? LB6: What test scenarios are planned? For comparison, DeadLetterQueueIntegrationTest on the normal path covers DSL/ProcessorAPI x FAIL/CONTINUE plus a deserialization variant - are we mirroring that coverage for the global thread? LB7: Small naming nit - the KIP introduces a recordCollector field on GlobalProcessorContextImpl, but the analogous field on ProcessorContextImpl is just called collector. Any reason not to match? Cheers, Lucas
