cadonna commented on PR #18739: URL: https://github.com/apache/kafka/pull/18739#issuecomment-2697493506
@loicgreffier The first question we need to answer is the following: Should the raw input record be (a) the record from which the defective record that triggered the handler was derived from or should the raw input record be (b) the record that triggered the defective record to be sent downstreams? If we decide for (a), we need to define the input record for the DSL operation. For example, what is the input record that should be written to the dead letter queue in case an aggregation result triggers the error handler? Intuitively, I would say it is the last record that contributed to the aggregation. However, there are cases where this might not be true. For example, if the aggregate is a list of accumulated records and the last record is not responsible for the error downstreams. With (a), ideally, we need to carry the raw input record everywhere, into state stores, changelogs, caches, and buffers. I checked the code. I believe, we have an issue also with caches since I realized that when an entry is added to the cache during a get operation, the record context is populated with sentinel values like `-1` and `null`: https://github.com/apache/kafka/blob/f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L372. If we decide for (b), maintaining the raw input record might be a bit simpler but I am not sure how useful the raw input record is in this case. You cannot really understand why something went wrong and replay. Let me know what you think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
