Hi Kevin, You mention the link https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0 , I assume this is the approach you are considering. And that this is being done at the connector level, as the message could be in any of the existing supported formats – so is not appropriate as a new format. It sounds like for deserialization, the reference to external storage header would be found in your deser and the contents then taken from external source and put into the Kafka body or the other way round for serialization. This is different to my Apicurio work that is to handle format specific headers.
I assume you would want to add configuration to define where the external storage lives and authentication. Limitations around stack and heap sizes would be worth considering. Am I understanding your intent correctly? Kind regards, David. From: Kevin Lam <kevin....@shopify.com.INVALID> Date: Wednesday, 10 July 2024 at 14:35 To: dev@flink.apache.org <dev@flink.apache.org> Subject: [EXTERNAL] Re: Potential Kafka Connector FLIP: Large Message Handling Hey all, just a follow-up here. I was able to insert our Large Message handling by overriding value.serializer <https://kafka.apache.org/documentation/#producerconfigs_value.serializer> and value.deserializer <https://kafka.apache.org/documentation/#consumerconfigs_value.deserializer> in the consumer and producer configuration that Flink sets, using the `properties.*` option in the Kafka Connector. This approach doesn't require Flink to know anything about large messages or have any major changes made to it. Flink uses the ByteArray(De|S)erializers by default in its source <https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L457-L470> and sink <https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83>. Overriding the source serializer requires a small change to flip this boolean <https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L464> to make it overridable. I'm planning to start a separate thread to propose making `value.serializer` overridable. On Mon, Jul 8, 2024 at 11:18 AM Kevin Lam <kevin....@shopify.com> wrote: > Hi Fabian, > > Awesome, this project looks great! Thanks for sharing. It would work well > with KafkaSource and the DataStream API as you've mentioned. We have > something similar internally, but where we are encountering difficulty is > integrating it with the Flink SQL Kafka DynamicTable Source and Sinks. Our > Large Message SerDe uses the Kafka Message header to store the URI on > object storage, and currently the Flink SQL Format Interfaces do not allow > passing data to/from the Kafka message headers, which lead me to suggest my > proposal. It's not easy for us to change our Large Message SerDe to use the > value to provide the reference, as it's already widely used and would > require a significant migration. > > However, thinking further, maybe we should not bring in any Large message > concerns into Flink, but instead better support reading and writing > headers from Flink Formats. > > I'm aware of the existing work in progress on handling headers via > FLIP-454 > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-454%3A+New+Apicurio+Avro+format> > and > this mailing list discussion > <https://lists.apache.org/thread/f1y8dzplmsjdzcwsw60cml1rnfl57nol>. > > On Mon, Jul 8, 2024 at 10:08 AM Fabian Paul <fp...@apache.org> wrote: > >> Hi Kevin, >> >> I worked on a project [1] in the past that had a similar purpose. You >> should be able to use a similar approach with the existing KafkaSource by >> implementing your own KafkaRecordDeserializationSchema that hides the >> logic >> of pulling the records from blob storage from the connector. You can even >> use the linked project directly with the KafkaSource using [2] and [3]. >> >> I agree there is room for improvements, like propagating Flink's >> Filesystem >> credentials to the custom deserializer, but the overall idea seems to >> require only very few changes to Flink. >> >> Best, >> Fabian >> >> [1] https://github.com/bakdata/kafka-large-message-serde >> [2] >> >> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L107 >> [3] >> >> https://github.com/bakdata/kafka-large-message-serde/blob/09eae933afaf8a1970b1b1bebcdffe934c368cb9/large-message-serde/src/main/java/com/bakdata/kafka/LargeMessageDeserializer.java#L50 >> >> On Mon, Jul 8, 2024 at 3:49 PM Kevin Lam <kevin....@shopify.com.invalid> >> wrote: >> >> > Hi all, >> > >> > Thanks for the responses. >> > >> > Grace those are indeed both challenges, thanks for flagging them. >> Regarding >> > expiry, we could consider having a Mark and Sweep garbage collection >> > system. A service can consume the topics with large messages, and track >> > references. When there are no references left for large messages, they >> can >> > be removed. >> > >> > Martjin, I will take a look at if there's any prior discussions in the >> > Kafka community and send the proposal to the Kafka Dev mailing list if >> it >> > makes sense :). It'd be much preferred if this was natively supported by >> > Kafka, since it's not currently I was also exploring making this work in >> > Flink. >> > >> > >> > >> > On Mon, Jul 8, 2024 at 3:23 AM Martijn Visser <martijnvis...@apache.org >> > >> > wrote: >> > >> > > Hi Kevin, >> > > >> > > I just want to double check, were you planning to send this proposal >> to >> > the >> > > Kafka Dev mailing list? Because I don't see directly how this affects >> > Flink >> > > :) >> > > >> > > Best regards, >> > > >> > > Martijn >> > > >> > > On Mon, Jul 8, 2024 at 8:05 AM Grace Grimwood <ggrim...@redhat.com> >> > wrote: >> > > >> > > > Hi Kevin, >> > > > >> > > > Thanks for starting this thread. >> > > > >> > > > This idea is something that was discussed in Kroxylicious (an open >> > source >> > > > Kafka proxy, I'm a maintainer there). In that discussion [1] we >> came to >> > > the >> > > > conclusion that there are a couple of issues with implementing this: >> > > > 1. Doesn't scale - very large messages (>1GiB) or large batch sizes >> > could >> > > > cause extreme memory bloat in clients, as the entire thing would >> need >> > to >> > > be >> > > > fed into the producer which could very quickly fill its buffers. >> > > Depending >> > > > on how the subsequent deserialization and payload fetch is handled >> at >> > the >> > > > consumer end, it's likely that the same behaviour would also be seen >> > > there. >> > > > 2. Difficult to sync expiry - when Kafka deletes messages due to >> > > retention >> > > > (or topic compaction), it does so without notifying clients. There >> is >> > no >> > > > (easy) way to ensure the associated payload is deleted from object >> > > storage >> > > > at the same time. >> > > > >> > > > It's not totally clear how Conduktor solved these issues, but IMO >> they >> > > are >> > > > worth keeping in mind. For Kroxylicious we decided these problems >> meant >> > > it >> > > > wasn't practical for us to implement this, but I'd be curious to >> know >> > if >> > > > you've got any ideas :) >> > > > >> > > > Regards, >> > > > Grace >> > > > >> > > > [1] https://github.com/kroxylicious/kroxylicious/discussions/1244 >> > > > >> > > > On Sat, Jul 6, 2024 at 8:21 AM Kevin Lam >> <kevin....@shopify.com.invalid >> > > >> > > > wrote: >> > > > >> > > > > Hi all, >> > > > > >> > > > > Writing to see if the community would be open to exploring a FLIP >> for >> > > the >> > > > > Kafka Table Connectors. The FLIP would allow for storing Kafka >> > Messages >> > > > > beyond a Kafka cluster's message limit (1 MB by default) out of >> band >> > in >> > > > > cloud object storage or another backend. >> > > > > >> > > > > During serialization the message would be replaced with a >> reference, >> > > and >> > > > > during deserialization the reference would be used to fetch the >> large >> > > > > message and pass it to Flink. Something like Option 1 in this blog >> > post >> > > > > < >> > > > > >> > > > >> > > >> > >> https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0 >> > > > > > >> > > > > . >> > > > > >> > > > > What do you think? >> > > > > >> > > > > We can make it generic by allowing users to implement their own >> > > > > LargeMessageSerializer/Deserializer interface for serializing and >> > > > > deserializing and handling interactions with object storage or >> some >> > > other >> > > > > backend. >> > > > > >> > > > > The Kafka Connectors can be extended to support ConfigOptions to >> > > > > specify the class to load, as well as some user-specified >> properties. >> > > For >> > > > > example: `large-record-handling.class` and ` >> > > > > large-record-handling.properties.*` (where the user can specify >> any >> > > > > properties similar to how the Kafka Consumer and Producer >> properties >> > > are >> > > > > handled >> > > > > < >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java#L201 >> > > > > > >> > > > > ). >> > > > > >> > > > > In terms of call sites for the LargeMessage handling, I think we >> can >> > > > > consider inside of DynamicKafkaDeserializationSchema >> > > > > < >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java >> > > > > > >> > > > > and DynamicKafkaRecordSerializationSchema >> > > > > < >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java >> > > > > >, >> > > > > where the ConsumerRecord >> > > > > < >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L108 >> > > > > > >> > > > > and ProducerRecords >> > > > > < >> > > > > >> > > > >> > > >> > >> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L75 >> > > > > > >> > > > > are passed respectively. >> > > > > >> > > > > If there's interest, I would be happy to help flesh out the >> proposal >> > > > more. >> > > > > >> > > > >> > > >> > >> > Unless otherwise stated above: IBM United Kingdom Limited Registered in England and Wales with number 741598 Registered office: PO Box 41, North Harbour, Portsmouth, Hants. PO6 3AU