loicgreffier commented on PR #18739: URL: https://github.com/apache/kafka/pull/18739#issuecomment-2729270055
@cadonna KIP-1034 suggests that the input record should be the one that triggers the sub-topology: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1034%3A+Dead+letter+queue+in+Kafka+Streams#KIP1034:DeadletterqueueinKafkaStreams-DefaultDeadletterqueuerecord > (a) the record from which the defective record that triggered the handler was derived from (a) aligns with the definition in KIP-1034, doesn't it? It makes it easier to reprocess records. > (b) the record that triggered the defective record to be sent downstreams? Does this mean that in the following example: ```java .stream() .selectKey() .mapValues() <----------- Exception here ... ``` The record sent to the DLQ would be the input record of the mapValues processor? (i.e., the one passed to the processing error handler? --- > Intuitively, I would say it is the last record that contributed to the aggregation. ```java .stream(INPUT_TOPIC) .selectKey() .groupByKey() .aggregate( initializer, aggregator, <----------- Exception here materialized ) ... ``` With the current implementation: - The source record is the last record that contributed to the aggregation. - The source topic is the repartition topic. - The default DLQ topic, specified by `errors.dead.letter.queue.topic.name`, may contain records from different sub-topologies (e.g., records from `INPUT_TOPIC` or the `repartition` topic). --- > I believe, we have an issue also with caches since I realized that when an entry is added to the cache during a get operation I will check and reproduce this case @sebastienviale Feel free to add anything if I missed a point -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
