Andrew, thanks for the details about Consumer internals. That's super
useful for this discussion! -- And it confirms my understanding.
I don't think we want to use ConsumerRecord<Void,Void> type thought,
because for a DLQ the handler wants to write the message into some DLQ
topic, and thus needs the key and value, so only
`ConsumerRecord<byte[],byte[]>` would work (or maybe `<ByteBuffer,
ByteBuffer>`).
While I would be ok with using `ConsumerRecord`, I don't see a huge
advantage compared to passing in all fields we are interested in
one-by-one. In the end, if the data is written into a DLQ topic, the
`ConsumerRecord` object cannot be reused (but the handler will build a
`ProducerRecord`), and `ConsumerRecord` would "just" be a container -- I
don't think it would simplify user-code or provide any other benefit,
but just add an (unnecessary?) level wrapping/indirection?
The only advantage I would see, is for the case that new interesting
metadata fields get added to the message format -- for this case, using
`ConsumerRecord` would automatically include these new fields, and we
don't need to modify the exception class to add them explicitly. But as
this happens very rarely, it does not seem to provide a huge benefit.
In the end, I would be fine either way. Curious to hear what others think.
-Matthias
On 4/18/24 8:41 AM, Andrew Schofield wrote:
Hi,
Thanks for the KIP. I think it’s an interesting idea and it seems to work
nicely with how
the clients work today.
Recently, I’ve been digging in to the record deserialization code in the
consumer as
part of implementing KIP-932. It’s pretty nasty in there.
There are essentially two kinds of problems that can be encountered when
converting the response from a Fetch RPC into ConsumerRecords.
1) The batch might be corrupt, so you can’t even work out what records are in
there.
2) Individual records cannot be deserialized.
In the second case, the consumer accumulates records from a Fetch response until
it hits a record which it cannot deserialize. If it has any parsed records
already to return
to the application, it parks the RecordDeserializationException and returns the
records
so far. Then, on the next poll, because there are no parsed records waiting, it
throws
the RecordDeserializationException. And so it continues until all fetched
records are
processed.
I don’t really think of org.apache.kafka.common.record.Record as an external
interface.
I think it’s a bit close to the wire to be putting on a user-facing interface.
We do know almost everything about the bad record at the point where the
deserialization fails. I wonder whether actually we could use
ConsumerRecord<ByteBuffer, ByteBuffer> or even ConsumerRecord<Void, Void> :)
in the constructor for the RecordDeserializationException. I don’t really like
having
a long list of each of the individual items of the record’s parts like
timestamp and so on.
It’s nicer to have an interface to the record that we can evolve without having
to change
the constructor for this exception.
Thanks,
Andrew
On 18 Apr 2024, at 15:13, Frédérik Rouleau <froul...@confluent.io.INVALID>
wrote:
Hi,
But I guess my main question is really about what metadata we really
want to add to `RecordDeserializationException`? `Record` expose all
kind of internal (serialization) metadata like `keySize()`,
`valueSize()` and many more. For the DLQ use-case it seems we don't
really want any of these? So I am wondering if just adding
key/value/ts/headers would be sufficient?
I think that key/value/ts/headers, topicPartition and offset are all we
need. I do not see any usage for other metadata. If someone has a use case,
I would like to know it.
So in that case we can directly add the data into the exception. We can
keep ByteBuffer for the local field instead of byte[], that will avoid
memory allocation if users do not require it.
I wonder if we should return the ByteBuffer or directly the byte[] (or both
?) which is more convenient for end users. Any thoughts?
Then we can have something like:
public RecordDeserializationException(TopicPartition partition,
long offset,
ByteBuffer key,
ByteBuffer value,
Header[] headers,
long timestamp,
String message,
Throwable cause);
public TopicPartition topicPartition();
public long offset();
public long timestamp();
public byte[] key(); // Will allocate the array on call
public byte[] value(); // Will allocate the array on call
public Header[] headers();
Regards,
Fred