Re: Potential Kafka Connector FLIP: Large Message Handling

2024-07-08 Thread Kevin Lam
Hi Fabian,

Awesome, this project looks great! Thanks for sharing. It would work well
with KafkaSource and the DataStream API as you've mentioned. We have
something similar internally, but where we are encountering difficulty is
integrating it with the Flink SQL Kafka DynamicTable Source and Sinks. Our
Large Message SerDe uses the Kafka Message header to store the URI on
object storage, and currently the Flink SQL Format Interfaces do not allow
passing data to/from the Kafka message headers, which lead me to suggest my
proposal. It's not easy for us to change our Large Message SerDe to use the
value to provide the reference, as it's already widely used and would
require a significant migration.

However, thinking further, maybe we should not bring in any Large message
concerns into Flink, but instead better support reading and writing
headers from Flink Formats.

I'm aware of the existing work in progress on handling headers via FLIP-454

and
this mailing list discussion
.

On Mon, Jul 8, 2024 at 10:08 AM Fabian Paul  wrote:

> Hi Kevin,
>
> I worked on a project [1] in the past that had a similar purpose. You
> should be able to use a similar approach with the existing KafkaSource by
> implementing your own KafkaRecordDeserializationSchema that hides the logic
> of pulling the records from blob storage from the connector. You can even
> use the linked project directly with the KafkaSource using [2] and [3].
>
> I agree there is room for improvements, like propagating Flink's Filesystem
> credentials to the custom deserializer, but the overall idea seems to
> require only very few changes to Flink.
>
> Best,
> Fabian
>
> [1] https://github.com/bakdata/kafka-large-message-serde
> [2]
>
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L107
> [3]
>
> https://github.com/bakdata/kafka-large-message-serde/blob/09eae933afaf8a1970b1b1bebcdffe934c368cb9/large-message-serde/src/main/java/com/bakdata/kafka/LargeMessageDeserializer.java#L50
>
> On Mon, Jul 8, 2024 at 3:49 PM Kevin Lam 
> wrote:
>
> > Hi all,
> >
> > Thanks for the responses.
> >
> > Grace those are indeed both challenges, thanks for flagging them.
> Regarding
> > expiry, we could consider having a Mark and Sweep garbage collection
> > system. A service can consume the topics with large messages, and track
> > references. When there are no references left for large messages, they
> can
> > be removed.
> >
> > Martjin, I will take a look at if there's any prior discussions in the
> > Kafka community and send the proposal to the Kafka Dev mailing list if it
> > makes sense :). It'd be much preferred if this was natively supported by
> > Kafka, since it's not currently I was also exploring making this work in
> > Flink.
> >
> >
> >
> > On Mon, Jul 8, 2024 at 3:23 AM Martijn Visser 
> > wrote:
> >
> > > Hi Kevin,
> > >
> > > I just want to double check, were you planning to send this proposal to
> > the
> > > Kafka Dev mailing list? Because I don't see directly how this affects
> > Flink
> > > :)
> > >
> > > Best regards,
> > >
> > > Martijn
> > >
> > > On Mon, Jul 8, 2024 at 8:05 AM Grace Grimwood 
> > wrote:
> > >
> > > > Hi Kevin,
> > > >
> > > > Thanks for starting this thread.
> > > >
> > > > This idea is something that was discussed in Kroxylicious (an open
> > source
> > > > Kafka proxy, I'm a maintainer there). In that discussion [1] we came
> to
> > > the
> > > > conclusion that there are a couple of issues with implementing this:
> > > > 1. Doesn't scale - very large messages (>1GiB) or large batch sizes
> > could
> > > > cause extreme memory bloat in clients, as the entire thing would need
> > to
> > > be
> > > > fed into the producer which could very quickly fill its buffers.
> > > Depending
> > > > on how the subsequent deserialization and payload fetch is handled at
> > the
> > > > consumer end, it's likely that the same behaviour would also be seen
> > > there.
> > > > 2. Difficult to sync expiry - when Kafka deletes messages due to
> > > retention
> > > > (or topic compaction), it does so without notifying clients. There is
> > no
> > > > (easy) way to ensure the associated payload is deleted from object
> > > storage
> > > > at the same time.
> > > >
> > > > It's not totally clear how Conduktor solved these issues, but IMO
> they
> > > are
> > > > worth keeping in mind. For Kroxylicious we decided these problems
> meant
> > > it
> > > > wasn't practical for us to implement this, but I'd be curious to know
> > if
> > > > you've got any ideas :)
> > > >
> > > > Regards,
> > > > Grace
> > > >
> > > > [1] https://github.com/kroxylicious/kroxylicious/discussions/1244
> > > >
> > > > On Sat, Jul 6, 2024 at 8:21 AM 

Re: Potential Kafka Connector FLIP: Large Message Handling

2024-07-08 Thread Fabian Paul
Hi Kevin,

I worked on a project [1] in the past that had a similar purpose. You
should be able to use a similar approach with the existing KafkaSource by
implementing your own KafkaRecordDeserializationSchema that hides the logic
of pulling the records from blob storage from the connector. You can even
use the linked project directly with the KafkaSource using [2] and [3].

I agree there is room for improvements, like propagating Flink's Filesystem
credentials to the custom deserializer, but the overall idea seems to
require only very few changes to Flink.

Best,
Fabian

[1] https://github.com/bakdata/kafka-large-message-serde
[2]
https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L107
[3]
https://github.com/bakdata/kafka-large-message-serde/blob/09eae933afaf8a1970b1b1bebcdffe934c368cb9/large-message-serde/src/main/java/com/bakdata/kafka/LargeMessageDeserializer.java#L50

On Mon, Jul 8, 2024 at 3:49 PM Kevin Lam 
wrote:

> Hi all,
>
> Thanks for the responses.
>
> Grace those are indeed both challenges, thanks for flagging them. Regarding
> expiry, we could consider having a Mark and Sweep garbage collection
> system. A service can consume the topics with large messages, and track
> references. When there are no references left for large messages, they can
> be removed.
>
> Martjin, I will take a look at if there's any prior discussions in the
> Kafka community and send the proposal to the Kafka Dev mailing list if it
> makes sense :). It'd be much preferred if this was natively supported by
> Kafka, since it's not currently I was also exploring making this work in
> Flink.
>
>
>
> On Mon, Jul 8, 2024 at 3:23 AM Martijn Visser 
> wrote:
>
> > Hi Kevin,
> >
> > I just want to double check, were you planning to send this proposal to
> the
> > Kafka Dev mailing list? Because I don't see directly how this affects
> Flink
> > :)
> >
> > Best regards,
> >
> > Martijn
> >
> > On Mon, Jul 8, 2024 at 8:05 AM Grace Grimwood 
> wrote:
> >
> > > Hi Kevin,
> > >
> > > Thanks for starting this thread.
> > >
> > > This idea is something that was discussed in Kroxylicious (an open
> source
> > > Kafka proxy, I'm a maintainer there). In that discussion [1] we came to
> > the
> > > conclusion that there are a couple of issues with implementing this:
> > > 1. Doesn't scale - very large messages (>1GiB) or large batch sizes
> could
> > > cause extreme memory bloat in clients, as the entire thing would need
> to
> > be
> > > fed into the producer which could very quickly fill its buffers.
> > Depending
> > > on how the subsequent deserialization and payload fetch is handled at
> the
> > > consumer end, it's likely that the same behaviour would also be seen
> > there.
> > > 2. Difficult to sync expiry - when Kafka deletes messages due to
> > retention
> > > (or topic compaction), it does so without notifying clients. There is
> no
> > > (easy) way to ensure the associated payload is deleted from object
> > storage
> > > at the same time.
> > >
> > > It's not totally clear how Conduktor solved these issues, but IMO they
> > are
> > > worth keeping in mind. For Kroxylicious we decided these problems meant
> > it
> > > wasn't practical for us to implement this, but I'd be curious to know
> if
> > > you've got any ideas :)
> > >
> > > Regards,
> > > Grace
> > >
> > > [1] https://github.com/kroxylicious/kroxylicious/discussions/1244
> > >
> > > On Sat, Jul 6, 2024 at 8:21 AM Kevin Lam  >
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > Writing to see if the community would be open to exploring a FLIP for
> > the
> > > > Kafka Table Connectors. The FLIP would allow for storing Kafka
> Messages
> > > > beyond a Kafka cluster's message limit (1 MB by default) out of band
> in
> > > > cloud object storage or another backend.
> > > >
> > > > During serialization the message would be replaced with a reference,
> > and
> > > > during deserialization the reference would be used to fetch the large
> > > > message and pass it to Flink. Something like Option 1 in this blog
> post
> > > > <
> > > >
> > >
> >
> https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0
> > > > >
> > > > .
> > > >
> > > > What do you think?
> > > >
> > > > We can make it generic by allowing users to implement their own
> > > > LargeMessageSerializer/Deserializer interface for serializing and
> > > > deserializing and handling interactions with object storage or some
> > other
> > > > backend.
> > > >
> > > > The Kafka Connectors can be extended to support ConfigOptions to
> > > > specify the class to load, as well as some user-specified properties.
> > For
> > > > example: `large-record-handling.class` and `
> > > > large-record-handling.properties.*` (where the user can specify any
> > > > properties similar to how 

Re: Potential Kafka Connector FLIP: Large Message Handling

2024-07-08 Thread Kevin Lam
Hi all,

Thanks for the responses.

Grace those are indeed both challenges, thanks for flagging them. Regarding
expiry, we could consider having a Mark and Sweep garbage collection
system. A service can consume the topics with large messages, and track
references. When there are no references left for large messages, they can
be removed.

Martjin, I will take a look at if there's any prior discussions in the
Kafka community and send the proposal to the Kafka Dev mailing list if it
makes sense :). It'd be much preferred if this was natively supported by
Kafka, since it's not currently I was also exploring making this work in
Flink.



On Mon, Jul 8, 2024 at 3:23 AM Martijn Visser 
wrote:

> Hi Kevin,
>
> I just want to double check, were you planning to send this proposal to the
> Kafka Dev mailing list? Because I don't see directly how this affects Flink
> :)
>
> Best regards,
>
> Martijn
>
> On Mon, Jul 8, 2024 at 8:05 AM Grace Grimwood  wrote:
>
> > Hi Kevin,
> >
> > Thanks for starting this thread.
> >
> > This idea is something that was discussed in Kroxylicious (an open source
> > Kafka proxy, I'm a maintainer there). In that discussion [1] we came to
> the
> > conclusion that there are a couple of issues with implementing this:
> > 1. Doesn't scale - very large messages (>1GiB) or large batch sizes could
> > cause extreme memory bloat in clients, as the entire thing would need to
> be
> > fed into the producer which could very quickly fill its buffers.
> Depending
> > on how the subsequent deserialization and payload fetch is handled at the
> > consumer end, it's likely that the same behaviour would also be seen
> there.
> > 2. Difficult to sync expiry - when Kafka deletes messages due to
> retention
> > (or topic compaction), it does so without notifying clients. There is no
> > (easy) way to ensure the associated payload is deleted from object
> storage
> > at the same time.
> >
> > It's not totally clear how Conduktor solved these issues, but IMO they
> are
> > worth keeping in mind. For Kroxylicious we decided these problems meant
> it
> > wasn't practical for us to implement this, but I'd be curious to know if
> > you've got any ideas :)
> >
> > Regards,
> > Grace
> >
> > [1] https://github.com/kroxylicious/kroxylicious/discussions/1244
> >
> > On Sat, Jul 6, 2024 at 8:21 AM Kevin Lam 
> > wrote:
> >
> > > Hi all,
> > >
> > > Writing to see if the community would be open to exploring a FLIP for
> the
> > > Kafka Table Connectors. The FLIP would allow for storing Kafka Messages
> > > beyond a Kafka cluster's message limit (1 MB by default) out of band in
> > > cloud object storage or another backend.
> > >
> > > During serialization the message would be replaced with a reference,
> and
> > > during deserialization the reference would be used to fetch the large
> > > message and pass it to Flink. Something like Option 1 in this blog post
> > > <
> > >
> >
> https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0
> > > >
> > > .
> > >
> > > What do you think?
> > >
> > > We can make it generic by allowing users to implement their own
> > > LargeMessageSerializer/Deserializer interface for serializing and
> > > deserializing and handling interactions with object storage or some
> other
> > > backend.
> > >
> > > The Kafka Connectors can be extended to support ConfigOptions to
> > > specify the class to load, as well as some user-specified properties.
> For
> > > example: `large-record-handling.class` and `
> > > large-record-handling.properties.*` (where the user can specify any
> > > properties similar to how the Kafka Consumer and Producer properties
> are
> > > handled
> > > <
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java#L201
> > > >
> > > ).
> > >
> > > In terms of call sites for the LargeMessage handling, I think we can
> > > consider inside of DynamicKafkaDeserializationSchema
> > > <
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
> > > >
> > > and DynamicKafkaRecordSerializationSchema
> > > <
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
> > > >,
> > > where the ConsumerRecord
> > > <
> > >
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L108
> > > >
> > > and ProducerRecords
> > > <
> > >
> >
> 

Re: Potential Kafka Connector FLIP: Large Message Handling

2024-07-08 Thread Martijn Visser
Hi Kevin,

I just want to double check, were you planning to send this proposal to the
Kafka Dev mailing list? Because I don't see directly how this affects Flink
:)

Best regards,

Martijn

On Mon, Jul 8, 2024 at 8:05 AM Grace Grimwood  wrote:

> Hi Kevin,
>
> Thanks for starting this thread.
>
> This idea is something that was discussed in Kroxylicious (an open source
> Kafka proxy, I'm a maintainer there). In that discussion [1] we came to the
> conclusion that there are a couple of issues with implementing this:
> 1. Doesn't scale - very large messages (>1GiB) or large batch sizes could
> cause extreme memory bloat in clients, as the entire thing would need to be
> fed into the producer which could very quickly fill its buffers. Depending
> on how the subsequent deserialization and payload fetch is handled at the
> consumer end, it's likely that the same behaviour would also be seen there.
> 2. Difficult to sync expiry - when Kafka deletes messages due to retention
> (or topic compaction), it does so without notifying clients. There is no
> (easy) way to ensure the associated payload is deleted from object storage
> at the same time.
>
> It's not totally clear how Conduktor solved these issues, but IMO they are
> worth keeping in mind. For Kroxylicious we decided these problems meant it
> wasn't practical for us to implement this, but I'd be curious to know if
> you've got any ideas :)
>
> Regards,
> Grace
>
> [1] https://github.com/kroxylicious/kroxylicious/discussions/1244
>
> On Sat, Jul 6, 2024 at 8:21 AM Kevin Lam 
> wrote:
>
> > Hi all,
> >
> > Writing to see if the community would be open to exploring a FLIP for the
> > Kafka Table Connectors. The FLIP would allow for storing Kafka Messages
> > beyond a Kafka cluster's message limit (1 MB by default) out of band in
> > cloud object storage or another backend.
> >
> > During serialization the message would be replaced with a reference, and
> > during deserialization the reference would be used to fetch the large
> > message and pass it to Flink. Something like Option 1 in this blog post
> > <
> >
> https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0
> > >
> > .
> >
> > What do you think?
> >
> > We can make it generic by allowing users to implement their own
> > LargeMessageSerializer/Deserializer interface for serializing and
> > deserializing and handling interactions with object storage or some other
> > backend.
> >
> > The Kafka Connectors can be extended to support ConfigOptions to
> > specify the class to load, as well as some user-specified properties. For
> > example: `large-record-handling.class` and `
> > large-record-handling.properties.*` (where the user can specify any
> > properties similar to how the Kafka Consumer and Producer properties are
> > handled
> > <
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java#L201
> > >
> > ).
> >
> > In terms of call sites for the LargeMessage handling, I think we can
> > consider inside of DynamicKafkaDeserializationSchema
> > <
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
> > >
> > and DynamicKafkaRecordSerializationSchema
> > <
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
> > >,
> > where the ConsumerRecord
> > <
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L108
> > >
> > and ProducerRecords
> > <
> >
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L75
> > >
> > are passed respectively.
> >
> > If there's interest, I would be happy to help flesh out the proposal
> more.
> >
>


Re: Potential Kafka Connector FLIP: Large Message Handling

2024-07-08 Thread Grace Grimwood
Hi Kevin,

Thanks for starting this thread.

This idea is something that was discussed in Kroxylicious (an open source
Kafka proxy, I'm a maintainer there). In that discussion [1] we came to the
conclusion that there are a couple of issues with implementing this:
1. Doesn't scale - very large messages (>1GiB) or large batch sizes could
cause extreme memory bloat in clients, as the entire thing would need to be
fed into the producer which could very quickly fill its buffers. Depending
on how the subsequent deserialization and payload fetch is handled at the
consumer end, it's likely that the same behaviour would also be seen there.
2. Difficult to sync expiry - when Kafka deletes messages due to retention
(or topic compaction), it does so without notifying clients. There is no
(easy) way to ensure the associated payload is deleted from object storage
at the same time.

It's not totally clear how Conduktor solved these issues, but IMO they are
worth keeping in mind. For Kroxylicious we decided these problems meant it
wasn't practical for us to implement this, but I'd be curious to know if
you've got any ideas :)

Regards,
Grace

[1] https://github.com/kroxylicious/kroxylicious/discussions/1244

On Sat, Jul 6, 2024 at 8:21 AM Kevin Lam 
wrote:

> Hi all,
>
> Writing to see if the community would be open to exploring a FLIP for the
> Kafka Table Connectors. The FLIP would allow for storing Kafka Messages
> beyond a Kafka cluster's message limit (1 MB by default) out of band in
> cloud object storage or another backend.
>
> During serialization the message would be replaced with a reference, and
> during deserialization the reference would be used to fetch the large
> message and pass it to Flink. Something like Option 1 in this blog post
> <
> https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0
> >
> .
>
> What do you think?
>
> We can make it generic by allowing users to implement their own
> LargeMessageSerializer/Deserializer interface for serializing and
> deserializing and handling interactions with object storage or some other
> backend.
>
> The Kafka Connectors can be extended to support ConfigOptions to
> specify the class to load, as well as some user-specified properties. For
> example: `large-record-handling.class` and `
> large-record-handling.properties.*` (where the user can specify any
> properties similar to how the Kafka Consumer and Producer properties are
> handled
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java#L201
> >
> ).
>
> In terms of call sites for the LargeMessage handling, I think we can
> consider inside of DynamicKafkaDeserializationSchema
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
> >
> and DynamicKafkaRecordSerializationSchema
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
> >,
> where the ConsumerRecord
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L108
> >
> and ProducerRecords
> <
> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L75
> >
> are passed respectively.
>
> If there's interest, I would be happy to help flesh out the proposal more.
>