Hey all, just a follow-up here. I was able to insert our Large Message
handling by overriding value.serializer
<https://kafka.apache.org/documentation/#producerconfigs_value.serializer>
and value.deserializer
<https://kafka.apache.org/documentation/#consumerconfigs_value.deserializer>
 in the consumer and producer configuration that Flink sets, using the
`properties.*` option in the Kafka Connector. This approach doesn't require
Flink to know anything about large messages or have any major changes made
to it.

Flink uses the ByteArray(De|S)erializers by default in its source
<https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L457-L470>
and sink
<https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java#L82-L83>.
Overriding the source serializer requires a small change to flip this
boolean
<https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java#L464>
to make it overridable. I'm planning to start a separate thread to propose
making `value.serializer` overridable.


On Mon, Jul 8, 2024 at 11:18 AM Kevin Lam <kevin....@shopify.com> wrote:

> Hi Fabian,
>
> Awesome, this project looks great! Thanks for sharing. It would work well
> with KafkaSource and the DataStream API as you've mentioned. We have
> something similar internally, but where we are encountering difficulty is
> integrating it with the Flink SQL Kafka DynamicTable Source and Sinks. Our
> Large Message SerDe uses the Kafka Message header to store the URI on
> object storage, and currently the Flink SQL Format Interfaces do not allow
> passing data to/from the Kafka message headers, which lead me to suggest my
> proposal. It's not easy for us to change our Large Message SerDe to use the
> value to provide the reference, as it's already widely used and would
> require a significant migration.
>
> However, thinking further, maybe we should not bring in any Large message
> concerns into Flink, but instead better support reading and writing
> headers from Flink Formats.
>
> I'm aware of the existing work in progress on handling headers via
> FLIP-454
> <https://cwiki.apache.org/confluence/display/FLINK/FLIP-454%3A+New+Apicurio+Avro+format>
>  and
> this mailing list discussion
> <https://lists.apache.org/thread/f1y8dzplmsjdzcwsw60cml1rnfl57nol>.
>
> On Mon, Jul 8, 2024 at 10:08 AM Fabian Paul <fp...@apache.org> wrote:
>
>> Hi Kevin,
>>
>> I worked on a project [1] in the past that had a similar purpose. You
>> should be able to use a similar approach with the existing KafkaSource by
>> implementing your own KafkaRecordDeserializationSchema that hides the
>> logic
>> of pulling the records from blob storage from the connector. You can even
>> use the linked project directly with the KafkaSource using [2] and [3].
>>
>> I agree there is room for improvements, like propagating Flink's
>> Filesystem
>> credentials to the custom deserializer, but the overall idea seems to
>> require only very few changes to Flink.
>>
>> Best,
>> Fabian
>>
>> [1] https://github.com/bakdata/kafka-large-message-serde
>> [2]
>>
>> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/deserializer/KafkaRecordDeserializationSchema.java#L107
>> [3]
>>
>> https://github.com/bakdata/kafka-large-message-serde/blob/09eae933afaf8a1970b1b1bebcdffe934c368cb9/large-message-serde/src/main/java/com/bakdata/kafka/LargeMessageDeserializer.java#L50
>>
>> On Mon, Jul 8, 2024 at 3:49 PM Kevin Lam <kevin....@shopify.com.invalid>
>> wrote:
>>
>> > Hi all,
>> >
>> > Thanks for the responses.
>> >
>> > Grace those are indeed both challenges, thanks for flagging them.
>> Regarding
>> > expiry, we could consider having a Mark and Sweep garbage collection
>> > system. A service can consume the topics with large messages, and track
>> > references. When there are no references left for large messages, they
>> can
>> > be removed.
>> >
>> > Martjin, I will take a look at if there's any prior discussions in the
>> > Kafka community and send the proposal to the Kafka Dev mailing list if
>> it
>> > makes sense :). It'd be much preferred if this was natively supported by
>> > Kafka, since it's not currently I was also exploring making this work in
>> > Flink.
>> >
>> >
>> >
>> > On Mon, Jul 8, 2024 at 3:23 AM Martijn Visser <martijnvis...@apache.org
>> >
>> > wrote:
>> >
>> > > Hi Kevin,
>> > >
>> > > I just want to double check, were you planning to send this proposal
>> to
>> > the
>> > > Kafka Dev mailing list? Because I don't see directly how this affects
>> > Flink
>> > > :)
>> > >
>> > > Best regards,
>> > >
>> > > Martijn
>> > >
>> > > On Mon, Jul 8, 2024 at 8:05 AM Grace Grimwood <ggrim...@redhat.com>
>> > wrote:
>> > >
>> > > > Hi Kevin,
>> > > >
>> > > > Thanks for starting this thread.
>> > > >
>> > > > This idea is something that was discussed in Kroxylicious (an open
>> > source
>> > > > Kafka proxy, I'm a maintainer there). In that discussion [1] we
>> came to
>> > > the
>> > > > conclusion that there are a couple of issues with implementing this:
>> > > > 1. Doesn't scale - very large messages (>1GiB) or large batch sizes
>> > could
>> > > > cause extreme memory bloat in clients, as the entire thing would
>> need
>> > to
>> > > be
>> > > > fed into the producer which could very quickly fill its buffers.
>> > > Depending
>> > > > on how the subsequent deserialization and payload fetch is handled
>> at
>> > the
>> > > > consumer end, it's likely that the same behaviour would also be seen
>> > > there.
>> > > > 2. Difficult to sync expiry - when Kafka deletes messages due to
>> > > retention
>> > > > (or topic compaction), it does so without notifying clients. There
>> is
>> > no
>> > > > (easy) way to ensure the associated payload is deleted from object
>> > > storage
>> > > > at the same time.
>> > > >
>> > > > It's not totally clear how Conduktor solved these issues, but IMO
>> they
>> > > are
>> > > > worth keeping in mind. For Kroxylicious we decided these problems
>> meant
>> > > it
>> > > > wasn't practical for us to implement this, but I'd be curious to
>> know
>> > if
>> > > > you've got any ideas :)
>> > > >
>> > > > Regards,
>> > > > Grace
>> > > >
>> > > > [1] https://github.com/kroxylicious/kroxylicious/discussions/1244
>> > > >
>> > > > On Sat, Jul 6, 2024 at 8:21 AM Kevin Lam
>> <kevin....@shopify.com.invalid
>> > >
>> > > > wrote:
>> > > >
>> > > > > Hi all,
>> > > > >
>> > > > > Writing to see if the community would be open to exploring a FLIP
>> for
>> > > the
>> > > > > Kafka Table Connectors. The FLIP would allow for storing Kafka
>> > Messages
>> > > > > beyond a Kafka cluster's message limit (1 MB by default) out of
>> band
>> > in
>> > > > > cloud object storage or another backend.
>> > > > >
>> > > > > During serialization the message would be replaced with a
>> reference,
>> > > and
>> > > > > during deserialization the reference would be used to fetch the
>> large
>> > > > > message and pass it to Flink. Something like Option 1 in this blog
>> > post
>> > > > > <
>> > > > >
>> > > >
>> > >
>> >
>> https://www.conduktor.io/kafka/how-to-send-large-messages-in-apache-kafka/#Option-1:-using-an-external-store-(GB-size-messages)-0
>> > > > > >
>> > > > > .
>> > > > >
>> > > > > What do you think?
>> > > > >
>> > > > > We can make it generic by allowing users to implement their own
>> > > > > LargeMessageSerializer/Deserializer interface for serializing and
>> > > > > deserializing and handling interactions with object storage or
>> some
>> > > other
>> > > > > backend.
>> > > > >
>> > > > > The Kafka Connectors can be extended to support ConfigOptions to
>> > > > > specify the class to load, as well as some user-specified
>> properties.
>> > > For
>> > > > > example: `large-record-handling.class` and `
>> > > > > large-record-handling.properties.*` (where the user can specify
>> any
>> > > > > properties similar to how the Kafka Consumer and Producer
>> properties
>> > > are
>> > > > > handled
>> > > > > <
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java#L201
>> > > > > >
>> > > > > ).
>> > > > >
>> > > > > In terms of call sites for the LargeMessage handling, I think we
>> can
>> > > > > consider inside of DynamicKafkaDeserializationSchema
>> > > > > <
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java
>> > > > > >
>> > > > > and DynamicKafkaRecordSerializationSchema
>> > > > > <
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
>> > > > > >,
>> > > > > where the ConsumerRecord
>> > > > > <
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaDeserializationSchema.java#L108
>> > > > > >
>> > > > > and ProducerRecords
>> > > > > <
>> > > > >
>> > > >
>> > >
>> >
>> https://github.com/apache/flink-connector-kafka/blob/15d3fbd4e65dae6c334e2386dd337d2bf423c216/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java#L75
>> > > > > >
>> > > > > are passed respectively.
>> > > > >
>> > > > > If there's interest, I would be happy to help flesh out the
>> proposal
>> > > > more.
>> > > > >
>> > > >
>> > >
>> >
>>
>

Reply via email to