Hi Kevin,

Thanks for starting this thread.

This idea is something that was discussed in Kroxylicious (an open source
Kafka proxy, I'm a maintainer there). In that discussion [1] we came to the
conclusion that there are a couple of issues with implementing this:
1. Doesn't scale - very large messages (>1GiB) or large batch sizes could
cause extreme memory bloat in clients, as the entire thing would need to be
fed into the producer which could very quickly fill its buffers. Depending
on how the subsequent deserialization and payload fetch is handled at the
consumer end, it's likely that the same behaviour would also be seen there.
2. Difficult to sync expiry - when Kafka deletes messages due to retention
(or topic compaction), it does so without notifying clients. There is no
(easy) way to ensure the associated payload is deleted from object storage
at the same time.

It's not totally clear how Conduktor solved these issues, but IMO they are
worth keeping in mind. For Kroxylicious we decided these problems meant it
wasn't practical for us to implement this, but I'd be curious to know if
you've got any ideas :)

Regards,
Grace

[1] https://github.com/kroxylicious/kroxylicious/discussions/1244

On Sat, Jul 6, 2024 at 8:21 AM Kevin Lam <kevin....@shopify.com.invalid>
wrote:

> Hi all,
>
> Writing to see if the community would be open to exploring a FLIP for the
> Kafka Table Connectors. The FLIP would allow for storing Kafka Messages
> beyond a Kafka cluster's message limit (1 MB by default) out of band in
> cloud object storage or another backend.
>
> During serialization the message would be replaced with a reference, and
> during deserialization the reference would be used to fetch the large
> message and pass it to Flink. Something like Option 1 in this blog post
> <
> https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0
> >
> .
>
> What do you think?
>
> We can make it generic by allowing users to implement their own
> LargeMessageSerializer/Deserializer interface for serializing and
> deserializing and handling interactions with object storage or some other
> backend.
>
> The Kafka Connectors can be extended to support ConfigOptions to
> specify the class to load, as well as some user-specified properties. For
> example: `large-record-handling.class` and `
> large-record-handling.properties.*` (where the user can specify any
> properties similar to how the Kafka Consumer and Producer properties are
> handled
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java#L201
> >
> ).
>
> In terms of call sites for the LargeMessage handling, I think we can
> consider inside of DynamicKafkaDeserializationSchema
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
> >
> and DynamicKafkaRecordSerializationSchema
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
> >,
> where the ConsumerRecord
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L108
> >
> and ProducerRecords
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L75
> >
> are passed respectively.
>
> If there's interest, I would be happy to help flesh out the proposal more.
>

Reply via email to