Hi, Thanks for the KIP. I think it’s an interesting idea and it seems to work nicely with how the clients work today.
Recently, I’ve been digging in to the record deserialization code in the consumer as part of implementing KIP-932. It’s pretty nasty in there. There are essentially two kinds of problems that can be encountered when converting the response from a Fetch RPC into ConsumerRecords. 1) The batch might be corrupt, so you can’t even work out what records are in there. 2) Individual records cannot be deserialized. In the second case, the consumer accumulates records from a Fetch response until it hits a record which it cannot deserialize. If it has any parsed records already to return to the application, it parks the RecordDeserializationException and returns the records so far. Then, on the next poll, because there are no parsed records waiting, it throws the RecordDeserializationException. And so it continues until all fetched records are processed. I don’t really think of org.apache.kafka.common.record.Record as an external interface. I think it’s a bit close to the wire to be putting on a user-facing interface. We do know almost everything about the bad record at the point where the deserialization fails. I wonder whether actually we could use ConsumerRecord<ByteBuffer, ByteBuffer> or even ConsumerRecord<Void, Void> :) in the constructor for the RecordDeserializationException. I don’t really like having a long list of each of the individual items of the record’s parts like timestamp and so on. It’s nicer to have an interface to the record that we can evolve without having to change the constructor for this exception. Thanks, Andrew > On 18 Apr 2024, at 15:13, Frédérik Rouleau <froul...@confluent.io.INVALID> > wrote: > > Hi, > > But I guess my main question is really about what metadata we really >> want to add to `RecordDeserializationException`? `Record` expose all >> kind of internal (serialization) metadata like `keySize()`, >> `valueSize()` and many more. For the DLQ use-case it seems we don't >> really want any of these? So I am wondering if just adding >> key/value/ts/headers would be sufficient? >> > > I think that key/value/ts/headers, topicPartition and offset are all we > need. I do not see any usage for other metadata. If someone has a use case, > I would like to know it. > > So in that case we can directly add the data into the exception. We can > keep ByteBuffer for the local field instead of byte[], that will avoid > memory allocation if users do not require it. > I wonder if we should return the ByteBuffer or directly the byte[] (or both > ?) which is more convenient for end users. Any thoughts? > Then we can have something like: > > public RecordDeserializationException(TopicPartition partition, > long offset, > ByteBuffer key, > ByteBuffer value, > Header[] headers, > long timestamp, > String message, > Throwable cause); > > public TopicPartition topicPartition(); > > public long offset(); > > public long timestamp(); > > public byte[] key(); // Will allocate the array on call > > public byte[] value(); // Will allocate the array on call > > public Header[] headers(); > > > > Regards, > Fred