Andrew, thanks for the details about Consumer internals. That's super useful for this discussion! -- And it confirms my understanding.

I don't think we want to use ConsumerRecord<Void,Void> type thought, because for a DLQ the handler wants to write the message into some DLQ topic, and thus needs the key and value, so only `ConsumerRecord<byte[],byte[]>` would work (or maybe `<ByteBuffer, ByteBuffer>`).

While I would be ok with using `ConsumerRecord`, I don't see a huge advantage compared to passing in all fields we are interested in one-by-one. In the end, if the data is written into a DLQ topic, the `ConsumerRecord` object cannot be reused (but the handler will build a `ProducerRecord`), and `ConsumerRecord` would "just" be a container -- I don't think it would simplify user-code or provide any other benefit, but just add an (unnecessary?) level wrapping/indirection?

The only advantage I would see, is for the case that new interesting metadata fields get added to the message format -- for this case, using `ConsumerRecord` would automatically include these new fields, and we don't need to modify the exception class to add them explicitly. But as this happens very rarely, it does not seem to provide a huge benefit.

In the end, I would be fine either way. Curious to hear what others think.


-Matthias



On 4/18/24 8:41 AM, Andrew Schofield wrote:
Hi,
Thanks for the KIP. I think it’s an interesting idea and it seems to work 
nicely with how
the clients work today.

Recently, I’ve been digging in to the record deserialization code in the 
consumer as
part of implementing KIP-932. It’s pretty nasty in there.

There are essentially two kinds of problems that can be encountered when
converting the response from a Fetch RPC into ConsumerRecords.

1) The batch might be corrupt, so you can’t even work out what records are in 
there.
2) Individual records cannot be deserialized.

In the second case, the consumer accumulates records from a Fetch response until
it hits a record which it cannot deserialize. If it has any parsed records 
already to return
to the application, it parks the RecordDeserializationException and returns the 
records
so far. Then, on the next poll, because there are no parsed records waiting, it 
throws
the RecordDeserializationException. And so it continues until all fetched 
records are
processed.

I don’t really think of org.apache.kafka.common.record.Record as an external 
interface.
I think it’s a bit close to the wire to be putting on a user-facing interface.

We do know almost everything about the bad record at the point where the
deserialization fails. I wonder whether actually we could use
ConsumerRecord<ByteBuffer, ByteBuffer> or even ConsumerRecord<Void, Void> :)
in the constructor for the RecordDeserializationException. I don’t really like 
having
a long list of each of the individual items of the record’s parts like 
timestamp and so on.
It’s nicer to have an interface to the record that we can evolve without having 
to change
the constructor for this exception.

Thanks,
Andrew

On 18 Apr 2024, at 15:13, Frédérik Rouleau <froul...@confluent.io.INVALID> 
wrote:

Hi,

But I guess my main question is really about what metadata we really
want to add to `RecordDeserializationException`? `Record` expose all
kind of internal (serialization) metadata like `keySize()`,
`valueSize()` and many more. For the DLQ use-case it seems we don't
really want any of these? So I am wondering if just adding
key/value/ts/headers would be sufficient?


I think that key/value/ts/headers, topicPartition and offset are all we
need. I do not see any usage for other metadata. If someone has a use case,
I would like to know it.

So in that case we can directly add the data into the exception. We can
keep ByteBuffer for the local field instead of byte[], that will avoid
memory allocation if users do not require it.
I wonder if we should return the ByteBuffer or directly the byte[] (or both
?) which is more convenient for end users. Any thoughts?
Then we can have something like:

public RecordDeserializationException(TopicPartition partition,
                                     long offset,
                                     ByteBuffer key,
                                     ByteBuffer value,
                                     Header[] headers,
                                     long timestamp,
                                     String message,
                                     Throwable cause);

public TopicPartition topicPartition();

public long offset();

public long timestamp();

public byte[] key(); // Will allocate the array on call

public byte[] value(); // Will allocate the array on call

public Header[] headers();



Regards,
Fred

Reply via email to