As for the ConsumerRecord vs Record thing -- I personally think the
other alternative that Kirk mentioned would make more sense here,
that is, returning a Optional<ConsumerRecord<byte[], byte[]>> rather
than changing the type from ConsumerRecord to Record.

I'm not sure why checkstyle is saying we shouldn't use the Record
class, but I'm a bit uncomfortable with ignoring this unless we have
someone who can explain why it's complaining and whether this
applies to our situation or not. I'm worried the Record class has
something to do with the old legacy records and using it here
would be a step backward. Note that most of the classes that
implement the Record interface have Legacy in their name, and
neither the ConsumerRecord nor ProducerRecord that most people
are familiar with extend the Record class.

That said, I literally have no context on the history of Record or why
it's not supposed to be used, so I welcome someone with more info
here to chime in. Without additional context, I'd say we should just
use the ConsumerRecord type as initially proposed, and make the
getter API return an Optional<ConsumerRecord<byte[], byte[]>>

I'm also personally unaware of what might cause us to be unable to
form a ConsumerRecord. Matthias and/or Kirk, can you elaborate
on the specific cases we're worried about here? We should really
highlight those in the getter javadocs to explain why it's an Optional.
What fields would/could be missing?

On Tue, Apr 16, 2024 at 10:11 AM Sophie Blee-Goldman <sop...@responsive.dev>
wrote:

> I think some missing context here (which can maybe be added in the
> Motivation section as background) is that the deserialization is actually
> done within Streams, not within the Consumer. Since the consumers
> in Kafka Streams might be subscribed to multiple topics with different
> data types, it has to use a ByteArrayDeserializer so that the consumer
> just hands back the plain bytes and then Kafka Streams can do all the
> deserialization based on its knowledge of the topic->serde mapping.
>
> Streams will just store the set of records returned by each #poll as plain
> bytes, and only when they are dequeued does it actually attempt to
> deserialize the record. So there's only one record being deserialized at
> a time, and each one can be handled separately by the deserialization
> exception handler. Which is what KIP-334 relies on.
>
> I do think it's worth saying something like that in the KIP somewhere,
> since it's somewhat obscure knowledge of Kafka Streams internals
> that not everyone will immediately know about. Feel free to just copy
> what I wrote or write something more succinct 🙂
>
> By the way, there are some minor formatting typos in the code snippet
> for the KIP. Not a big deal, but if you're editing the KIP anyways you
> might as well fix that
>
> On Tue, Apr 16, 2024 at 1:33 AM Frédérik Rouleau
> <froul...@confluent.io.invalid> wrote:
>
>> Hi Almog,
>>
>> I think you do not understand the behavior that was introduced with the
>> KIP-334.
>> When you have a DeserializationException, if you set the proper seek call
>> to skip the faulty record, the next poll call will return the remaining
>> records to process and not a new list of records. When the KIP was
>> released, I made a demo project
>> https://github.com/fred-ro/kafka-poison-pills not sure it's still
>> working,
>> I should spend time maintaining it. The only issue with the initial KIP is
>> that you do not have access to the data of the faulty record which makes
>> DLQ implementation quite difficult.
>>
>> Regards,
>> Fred
>>
>

Reply via email to