cadonna commented on PR #18739:
URL: https://github.com/apache/kafka/pull/18739#issuecomment-2697493506

   @loicgreffier 
   
   The first question we need to answer is the following:
   Should the raw input record be (a) the record from which the defective 
record that triggered the handler was derived from or should the raw input 
record be (b) the record that triggered the defective record to be sent 
downstreams?
   
   If we decide for (a), we need to define the input record for the DSL 
operation. For example, what is the input record that should be written to the 
dead letter queue in case an aggregation result triggers the error handler? 
Intuitively, I would say it is the last record that contributed to the 
aggregation. However, there are cases where this might not be true. For 
example, if the aggregate is a list of accumulated records and the last record 
is not responsible for the error downstreams. With (a), ideally, we need to 
carry the raw input record everywhere, into state stores, changelogs, caches, 
and buffers.  
   I checked the code. I believe, we have an issue also with caches since I 
realized that when an entry is added to the cache during a get operation, the 
record context is populated with sentinel values like `-1` and `null`:
   
https://github.com/apache/kafka/blob/f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L372.
      
   
   If we decide for (b), maintaining the raw input record might be a bit 
simpler but I am not sure how useful the raw input record is in this case. You 
cannot really understand why something went wrong and replay.
   
   Let me know what you think. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to