Hi everyone, If there is no any more feedback need to be addressed then I will open voting on this KIP early next week.
Regards Omnia > On 28 May 2025, at 11:13, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: > > Hi Luke, I had another round of updating the KIP. > I now made `PayloadStore::publish` returns a simplified version of > “PayloadResponse” which will now will simply have the "fullPayloadPath", and > “PayloadException”. > If PayloadResponse > - contains PayloadStoreException with `isRetriable` flag set to true then > this will trigger the retry logic > - If response code is 404 and large.message.skip.not.found.error then we will > skip failing on the download in the deserialiser side. > > WDYT? > > Regards > Omnia > > >> On 22 May 2025, at 15:16, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: >> >> Hi Luke, sorry for late response. >>> IMO, the retry should be the logic inside the >>> "large.message.payload.store.class" >>> implementation. If we really want it, I think we need to make it clear in >>> which circumstance we will retry. For example, if it's an unknown exception >>> thrown from S3 API, what will we do to it? >> There was one class definition missing which is PayloadStoreException that >> is used in PayloadResponse. The PayloadStoreException has flag if it is true >> then we retry. And then serialised retry on any exception with this flag. We >> can make publish and download methods throw PayloadStoreException directly >> to simplify things WDYT? >> >>> Moving it into the store class makes it much clearer. And it's good to have >>> a default implementation. >>> But to me, it's still an implementation-level detail that we don't need to >>> expose to users to implement it. >>> Could I know more about when the Id generator will be invoked? >>> My thought is : >>> Users can implement a `publish` method to publish like this: >>> public PayloadResponse publish(String topic, byte[] data) { >>> String id = genId(); >>> // put the id and data to the remote storage >>> s3Client.put(id, data, ...); >>> } >>> >>> So, with the id method in the interface, who will invoke it? Suppose it >>> will be the serializer/deserializer, but no one passes the generated id to >>> the publish method, how do we use it? >> >> It will be invoked by the payload class when publish the payload. So it is >> an implementation details of this class yes. >> >>> 7. Why do we need "PayloadResponse"? Why can't we return a String or a URL >>> object? >> >> Originally because I needed something bit of wrapper for re-try and skip >> logic. But as I mentioned in first point we can make publish and download >> methods return the ref or payload otherwise they throw PayloadStoreException >> directly to simplify things WDYT? Other option have publish return only ref >> as string and download would be the one that could throw >> PayloadNotFoundException. >> >>> 8. Could we change the abstract class to interface? >> We can once we agree on what config we will drop and move to the details of >> payload provided by the users. >> >> regards >> Omnia >> >>> On 19 May 2025, at 09:54, Luke Chen <show...@gmail.com> wrote: >>> >>> Hi Omnia, >>> >>> Thanks for the explanation and update. >>> It's better now. >>> >>> Questions: >>>> 2. It's not clear how the "retry count" comes into play in the KIP. It >>>> needs more explanation. >>> My initial thinking is the retry configs are a must for all blob stores, so >>> we can provide them, and validate them for free for all blob stores so not >>> every implementation will go through verifying them. >>> >>> IMO, the retry should be the logic inside the >>> "large.message.payload.store.class" >>> implementation. If we really want it, I think we need to make it clear in >>> which circumstance we will retry. For example, if it's an unknown exception >>> thrown from S3 API, what will we do to it? >>> >>>> 6. About the BlobIdGenerator, why do we need it, could you explain more? >>>> Again, I thought we only need to replace value to a path, and add the >>>> "large-message" header, so that when consumer reads this message, it'll >>>> read the path from value and get the original data via BlobStore. Why do >>> we >>>> need this ID generator? I think users should do the object naming when >>>> putting the object by themselves, not via another interface. WDYT? >>> In some cases generating the ID might need some smart work for example to >>> avoid s3 throttling the recommended way on their doc it to create sub paths >>> under the original bucket, to decide this we might hash the data to find a >>> suitable sub-path. >>> Here is an example of how I would generate an path for s3 file >>> ``` >>> public String id(byte[] data) { >>> String subFolder = topic + "-" + Utils.toPositive(Utils.murmur2(data)) % >>> distributionFactor // distributionFactor is a config for the Id generator >>> and it represent the max number of sub-folders under the bucket >>> return subFolder + “/“ + UUID.randomUUID().toString() >>> } >>> ``` >>> Hope this example clarify a bit. However I do agree here it might not need >>> a class. I have move it to be part of the store class. >>> >>> Moving it into the store class makes it much clearer. And it's good to have >>> a default implementation. >>> But to me, it's still an implementation-level detail that we don't need to >>> expose to users to implement it. >>> Could I know more about when the Id generator will be invoked? >>> My thought is : >>> Users can implement a `publish` method to publish like this: >>> public PayloadResponse publish(String topic, byte[] data) { >>> String id = genId(); >>> // put the id and data to the remote storage >>> s3Client.put(id, data, ...); >>> } >>> >>> So, with the id method in the interface, who will invoke it? Suppose it >>> will be the serializer/deserializer, but no one passes the generated id to >>> the publish method, how do we use it? >>> >>> 7. Why do we need "PayloadResponse"? Why can't we return a String or a URL >>> object? >>> >>> 8. Could we change the abstract class to interface? >>> >>> Thanks. >>> Luke >>> >>> >>> >>> >>> On Wed, Apr 30, 2025 at 9:21 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com >>> <mailto:o.g.h.ibra...@gmail.com>> >>> wrote: >>> >>>> Hi Luke, >>>> >>>>>> 3. What does "LargeMessageFormatter" do in the process? >>>>>> I thought all we want to do is to replace the "large value data" into a >>>>>> path, and consumers will read the path via blob store class. >>>>>> All these should be done in serializer/deserializer, so why do we need >>>> the >>>>>> formatter? >>>>> >>>>> I wanted to bit of more info than just the path to download, for example >>>> I want to add stuff like the class path for the original blob store for >>>> example if consumer is setup with the unmatched blob store to the one used >>>> during publishing. >>>>> I have updated the KIP to simplify this by having this always as a >>>> simple json of path and publisher class which can be represented as >>>> PayloadReferenceValue. WDYT? >>>> >>>> I thought of another case where having the freedom to form the reference >>>> might be nice feature, which is DR. Let’s imagine this case where someone >>>> publish large messages to S3 and reference to Kafka topic then they want to >>>> have DR. This can be achievable if they have mirrored Kafka topic which >>>> contains the references but if S3 is unreachable form the DR backup >>>> location then the reference they have is bit useless. However if the >>>> message formatter is customisable then dev can implement a complicated >>>> store that publish to two store locations and the publish both references >>>> to Kafka as one message and the consumer store can download from either >>>> buckets that are available. I think keeping the door open to such use-case >>>> might be good feature but also having such use case might be questionable a >>>> bit with the latency it will add as we will be publishing to N number of >>>> stores. >>>> >>>> Regards >>>> Omnia >>>> >>>>> On 24 Apr 2025, at 17:40, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: >>>>> >>>>> Hi Luke, thanks for having the time to look into the KIP >>>>>> 2. It's not clear how the "retry count" comes into play in the KIP. It >>>>>> needs more explanation. >>>>> My initial thinking is the retry configs are a must for all blob stores, >>>> so we can provide them, and validate them for free for all blob stores so >>>> not every implementation will go through verifying them. >>>>> >>>>>> 3. What does "LargeMessageFormatter" do in the process? >>>>>> I thought all we want to do is to replace the "large value data" into a >>>>>> path, and consumers will read the path via blob store class. >>>>>> All these should be done in serializer/deserializer, so why do we need >>>> the >>>>>> formatter? >>>>> >>>>> I wanted to bit of more info than just the path to download, for example >>>> I want to add stuff like the class path for the original blob store for >>>> example if consumer is setup with the unmatched blob store to the one used >>>> during publishing. >>>>> I have updated the KIP to simplify this by having this always as a >>>> simple json of path and publisher class which can be represented as >>>> PayloadReferenceValue. WDYT? >>>>> >>>>>> 4. In the BlobStore, it looks like we presume users will use object >>>> stores, >>>>>> which is not good. >>>>>> Could we make it more generic? Javadoc, method names, … >>>>> This is a good point, I have updated the method names and Javadoc. I >>>> also thinking of renaming the class name to PayloadStoreinstead of >>>> BlobStore as blob store still tide to object store as well. To set some >>>> context here, I am proposing this after working with some community form >>>> Apache Cassandra who are working on Cassandra CEP < >>>> https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-44%3A+Kafka+integration+for+Cassandra+CDC+using+Sidecar#CEP44:KafkaintegrationforCassandraCDCusingSidecar-LargeBlob>-44 >>>> to handle large CDC and the initial thinking was let’s publish any large >>>> cdc to an object store instead of Kafka this why the naming was suggesting >>>> “blob store” only. >>>>> >>>>>> 5. It would be good to have some explanation for the purpose of each new >>>>>> interface/class, and clear javadoc for each method. >>>>> >>>>> Updated the KIP with javadocs >>>>> >>>>>> 6. About the BlobIdGenerator, why do we need it, could you explain more? >>>>>> Again, I thought we only need to replace value to a path, and add the >>>>>> "large-message" header, so that when consumer reads this message, it'll >>>>>> read the path from value and get the original data via BlobStore. Why >>>> do we >>>>>> need this ID generator? I think users should do the object naming when >>>>>> putting the object by themselves, not via another interface. WDYT? >>>>> In some cases generating the ID might need some smart work for example >>>> to avoid s3 throttling the recommended way on their doc it to create sub >>>> paths under the original bucket, to decide this we might hash the data to >>>> find a suitable sub-path. >>>>> Here is an example of how I would generate an path for s3 file >>>>> ``` >>>>> public String id(byte[] data) { >>>>> String subFolder = topic + "-" + >>>> Utils.toPositive(Utils.murmur2(data)) % distributionFactor // >>>> distributionFactor is a config for the Id generator and it represent the >>>> max number of sub-folders under the bucket >>>>> return subFolder + “/“ + UUID.randomUUID().toString() >>>>> } >>>>> ``` >>>>> Hope this example clarify a bit. However I do agree here it might not >>>> need a class. I have move it to be part of the store class. >>>>> >>>>> Please let me know WDYT of the final shape of the KIP now >>>>> >>>>> Thanks >>>>> Omnia >>>>> >>>>>> On 24 Apr 2025, at 13:31, Luke Chen <show...@gmail.com> wrote: >>>>>> >>>>>> Hi Omnia, >>>>>> >>>>>> Thanks for proposing this feature that many users expected. >>>>>> >>>>>> Some comments: >>>>>> 1. It's quite interesting to see the idea of chained >>>>>> serializer/deserializer used here. I like it. >>>>>> >>>>>> 2. It's not clear how the "retry count" comes into play in the KIP. It >>>>>> needs more explanation. >>>>>> >>>>>> 3. What does "LargeMessageFormatter" do in the process? >>>>>> I thought all we want to do is to replace the "large value data" into a >>>>>> path, and consumers will read the path via blob store class. >>>>>> All these should be done in serializer/deserializer, so why do we need >>>> the >>>>>> formatter? >>>>>> >>>>>> 4. In the BlobStore, it looks like we presume users will use object >>>> stores, >>>>>> which is not good. >>>>>> Could we make it more generic? Javadoc, method names, ... >>>>>> >>>>>> 5. It would be good to have some explanation for the purpose of each new >>>>>> interface/class, and clear javadoc for each method. >>>>>> >>>>>> 6. About the BlobIdGenerator, why do we need it, could you explain more? >>>>>> Again, I thought we only need to replace value to a path, and add the >>>>>> "large-message" header, so that when consumer reads this message, it'll >>>>>> read the path from value and get the original data via BlobStore. Why >>>> do we >>>>>> need this ID generator? I think users should do the object naming when >>>>>> putting the object by themselves, not via another interface. WDYT? >>>>>> >>>>>> Thanks. >>>>>> Luke >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Apr 10, 2025 at 9:31 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi there I would like to start discussions on >>>>>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1159%3A+Large+message+reference+based+Serializer >>>>>>> >>>>>>> Thanks >>>>>>> Omnia >> >