Hi,
Thanks for the KIP. I think it’s an interesting idea and it seems to work 
nicely with how
the clients work today.

Recently, I’ve been digging in to the record deserialization code in the 
consumer as
part of implementing KIP-932. It’s pretty nasty in there.

There are essentially two kinds of problems that can be encountered when
converting the response from a Fetch RPC into ConsumerRecords.

1) The batch might be corrupt, so you can’t even work out what records are in 
there.
2) Individual records cannot be deserialized.

In the second case, the consumer accumulates records from a Fetch response until
it hits a record which it cannot deserialize. If it has any parsed records 
already to return
to the application, it parks the RecordDeserializationException and returns the 
records
so far. Then, on the next poll, because there are no parsed records waiting, it 
throws
the RecordDeserializationException. And so it continues until all fetched 
records are
processed.

I don’t really think of org.apache.kafka.common.record.Record as an external 
interface.
I think it’s a bit close to the wire to be putting on a user-facing interface.

We do know almost everything about the bad record at the point where the
deserialization fails. I wonder whether actually we could use
ConsumerRecord<ByteBuffer, ByteBuffer> or even ConsumerRecord<Void, Void> :)
in the constructor for the RecordDeserializationException. I don’t really like 
having
a long list of each of the individual items of the record’s parts like 
timestamp and so on.
It’s nicer to have an interface to the record that we can evolve without having 
to change
the constructor for this exception.

Thanks,
Andrew

> On 18 Apr 2024, at 15:13, Frédérik Rouleau <froul...@confluent.io.INVALID> 
> wrote:
> 
> Hi,
> 
> But I guess my main question is really about what metadata we really
>> want to add to `RecordDeserializationException`? `Record` expose all
>> kind of internal (serialization) metadata like `keySize()`,
>> `valueSize()` and many more. For the DLQ use-case it seems we don't
>> really want any of these? So I am wondering if just adding
>> key/value/ts/headers would be sufficient?
>> 
> 
> I think that key/value/ts/headers, topicPartition and offset are all we
> need. I do not see any usage for other metadata. If someone has a use case,
> I would like to know it.
> 
> So in that case we can directly add the data into the exception. We can
> keep ByteBuffer for the local field instead of byte[], that will avoid
> memory allocation if users do not require it.
> I wonder if we should return the ByteBuffer or directly the byte[] (or both
> ?) which is more convenient for end users. Any thoughts?
> Then we can have something like:
> 
> public RecordDeserializationException(TopicPartition partition,
>                                     long offset,
>                                     ByteBuffer key,
>                                     ByteBuffer value,
>                                     Header[] headers,
>                                     long timestamp,
>                                     String message,
>                                     Throwable cause);
> 
> public TopicPartition topicPartition();
> 
> public long offset();
> 
> public long timestamp();
> 
> public byte[] key(); // Will allocate the array on call
> 
> public byte[] value(); // Will allocate the array on call
> 
> public Header[] headers();
> 
> 
> 
> Regards,
> Fred

Reply via email to