Hi all, Thanks for the responses.
Grace those are indeed both challenges, thanks for flagging them. Regarding expiry, we could consider having a Mark and Sweep garbage collection system. A service can consume the topics with large messages, and track references. When there are no references left for large messages, they can be removed. Martjin, I will take a look at if there's any prior discussions in the Kafka community and send the proposal to the Kafka Dev mailing list if it makes sense :). It'd be much preferred if this was natively supported by Kafka, since it's not currently I was also exploring making this work in Flink. On Mon, Jul 8, 2024 at 3:23 AM Martijn Visser <martijnvis...@apache.org> wrote: > Hi Kevin, > > I just want to double check, were you planning to send this proposal to the > Kafka Dev mailing list? Because I don't see directly how this affects Flink > :) > > Best regards, > > Martijn > > On Mon, Jul 8, 2024 at 8:05 AM Grace Grimwood <ggrim...@redhat.com> wrote: > > > Hi Kevin, > > > > Thanks for starting this thread. > > > > This idea is something that was discussed in Kroxylicious (an open source > > Kafka proxy, I'm a maintainer there). In that discussion [1] we came to > the > > conclusion that there are a couple of issues with implementing this: > > 1. Doesn't scale - very large messages (>1GiB) or large batch sizes could > > cause extreme memory bloat in clients, as the entire thing would need to > be > > fed into the producer which could very quickly fill its buffers. > Depending > > on how the subsequent deserialization and payload fetch is handled at the > > consumer end, it's likely that the same behaviour would also be seen > there. > > 2. Difficult to sync expiry - when Kafka deletes messages due to > retention > > (or topic compaction), it does so without notifying clients. There is no > > (easy) way to ensure the associated payload is deleted from object > storage > > at the same time. > > > > It's not totally clear how Conduktor solved these issues, but IMO they > are > > worth keeping in mind. For Kroxylicious we decided these problems meant > it > > wasn't practical for us to implement this, but I'd be curious to know if > > you've got any ideas :) > > > > Regards, > > Grace > > > > [1] https://github.com/kroxylicious/kroxylicious/discussions/1244 > > > > On Sat, Jul 6, 2024 at 8:21 AM Kevin Lam <kevin....@shopify.com.invalid> > > wrote: > > > > > Hi all, > > > > > > Writing to see if the community would be open to exploring a FLIP for > the > > > Kafka Table Connectors. The FLIP would allow for storing Kafka Messages > > > beyond a Kafka cluster's message limit (1 MB by default) out of band in > > > cloud object storage or another backend. > > > > > > During serialization the message would be replaced with a reference, > and > > > during deserialization the reference would be used to fetch the large > > > message and pass it to Flink. Something like Option 1 in this blog post > > > < > > > > > > https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0 > > > > > > > . > > > > > > What do you think? > > > > > > We can make it generic by allowing users to implement their own > > > LargeMessageSerializer/Deserializer interface for serializing and > > > deserializing and handling interactions with object storage or some > other > > > backend. > > > > > > The Kafka Connectors can be extended to support ConfigOptions to > > > specify the class to load, as well as some user-specified properties. > For > > > example: `large-record-handling.class` and ` > > > large-record-handling.properties.*` (where the user can specify any > > > properties similar to how the Kafka Consumer and Producer properties > are > > > handled > > > < > > > > > > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java#L201 > > > > > > > ). > > > > > > In terms of call sites for the LargeMessage handling, I think we can > > > consider inside of DynamicKafkaDeserializationSchema > > > < > > > > > > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java > > > > > > > and DynamicKafkaRecordSerializationSchema > > > < > > > > > > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java > > > >, > > > where the ConsumerRecord > > > < > > > > > > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L108 > > > > > > > and ProducerRecords > > > < > > > > > > https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L75 > > > > > > > are passed respectively. > > > > > > If there's interest, I would be happy to help flesh out the proposal > > more. > > > > > >