Hi all,

Thanks for the responses.

Grace those are indeed both challenges, thanks for flagging them. Regarding
expiry, we could consider having a Mark and Sweep garbage collection
system. A service can consume the topics with large messages, and track
references. When there are no references left for large messages, they can
be removed.

Martjin, I will take a look at if there's any prior discussions in the
Kafka community and send the proposal to the Kafka Dev mailing list if it
makes sense :). It'd be much preferred if this was natively supported by
Kafka, since it's not currently I was also exploring making this work in
Flink.



On Mon, Jul 8, 2024 at 3:23 AM Martijn Visser <martijnvis...@apache.org>
wrote:

> Hi Kevin,
>
> I just want to double check, were you planning to send this proposal to the
> Kafka Dev mailing list? Because I don't see directly how this affects Flink
> :)
>
> Best regards,
>
> Martijn
>
> On Mon, Jul 8, 2024 at 8:05 AM Grace Grimwood <ggrim...@redhat.com> wrote:
>
> > Hi Kevin,
> >
> > Thanks for starting this thread.
> >
> > This idea is something that was discussed in Kroxylicious (an open source
> > Kafka proxy, I'm a maintainer there). In that discussion [1] we came to
> the
> > conclusion that there are a couple of issues with implementing this:
> > 1. Doesn't scale - very large messages (>1GiB) or large batch sizes could
> > cause extreme memory bloat in clients, as the entire thing would need to
> be
> > fed into the producer which could very quickly fill its buffers.
> Depending
> > on how the subsequent deserialization and payload fetch is handled at the
> > consumer end, it's likely that the same behaviour would also be seen
> there.
> > 2. Difficult to sync expiry - when Kafka deletes messages due to
> retention
> > (or topic compaction), it does so without notifying clients. There is no
> > (easy) way to ensure the associated payload is deleted from object
> storage
> > at the same time.
> >
> > It's not totally clear how Conduktor solved these issues, but IMO they
> are
> > worth keeping in mind. For Kroxylicious we decided these problems meant
> it
> > wasn't practical for us to implement this, but I'd be curious to know if
> > you've got any ideas :)
> >
> > Regards,
> > Grace
> >
> > [1] https://github.com/kroxylicious/kroxylicious/discussions/1244
> >
> > On Sat, Jul 6, 2024 at 8:21 AM Kevin Lam <kevin....@shopify.com.invalid>
> > wrote:
> >
> > > Hi all,
> > >
> > > Writing to see if the community would be open to exploring a FLIP for
> the
> > > Kafka Table Connectors. The FLIP would allow for storing Kafka Messages
> > > beyond a Kafka cluster's message limit (1 MB by default) out of band in
> > > cloud object storage or another backend.
> > >
> > > During serialization the message would be replaced with a reference,
> and
> > > during deserialization the reference would be used to fetch the large
> > > message and pass it to Flink. Something like Option 1 in this blog post
> > > <
> > >
> >
> https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0
> > > >
> > > .
> > >
> > > What do you think?
> > >
> > > We can make it generic by allowing users to implement their own
> > > LargeMessageSerializer/Deserializer interface for serializing and
> > > deserializing and handling interactions with object storage or some
> other
> > > backend.
> > >
> > > The Kafka Connectors can be extended to support ConfigOptions to
> > > specify the class to load, as well as some user-specified properties.
> For
> > > example: `large-record-handling.class` and `
> > > large-record-handling.properties.*` (where the user can specify any
> > > properties similar to how the Kafka Consumer and Producer properties
> are
> > > handled
> > > <
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java#L201
> > > >
> > > ).
> > >
> > > In terms of call sites for the LargeMessage handling, I think we can
> > > consider inside of DynamicKafkaDeserializationSchema
> > > <
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
> > > >
> > > and DynamicKafkaRecordSerializationSchema
> > > <
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
> > > >,
> > > where the ConsumerRecord
> > > <
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L108
> > > >
> > > and ProducerRecords
> > > <
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L75
> > > >
> > > are passed respectively.
> > >
> > > If there's interest, I would be happy to help flesh out the proposal
> > more.
> > >
> >
>

Reply via email to