Hi Luke, I had another round of updating the KIP. I now made `PayloadStore::publish` returns a simplified version of “PayloadResponse” which will now will simply have the "fullPayloadPath", and “PayloadException”. If PayloadResponse - contains PayloadStoreException with `isRetriable` flag set to true then this will trigger the retry logic - If response code is 404 and large.message.skip.not.found.error then we will skip failing on the download in the deserialiser side.
WDYT? Regards Omnia > On 22 May 2025, at 15:16, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: > > Hi Luke, sorry for late response. >> IMO, the retry should be the logic inside the >> "large.message.payload.store.class" >> implementation. If we really want it, I think we need to make it clear in >> which circumstance we will retry. For example, if it's an unknown exception >> thrown from S3 API, what will we do to it? > There was one class definition missing which is PayloadStoreException that is > used in PayloadResponse. The PayloadStoreException has flag if it is true > then we retry. And then serialised retry on any exception with this flag. We > can make publish and download methods throw PayloadStoreException directly > to simplify things WDYT? > >> Moving it into the store class makes it much clearer. And it's good to have >> a default implementation. >> But to me, it's still an implementation-level detail that we don't need to >> expose to users to implement it. >> Could I know more about when the Id generator will be invoked? >> My thought is : >> Users can implement a `publish` method to publish like this: >> public PayloadResponse publish(String topic, byte[] data) { >> String id = genId(); >> // put the id and data to the remote storage >> s3Client.put(id, data, ...); >> } >> >> So, with the id method in the interface, who will invoke it? Suppose it >> will be the serializer/deserializer, but no one passes the generated id to >> the publish method, how do we use it? > > It will be invoked by the payload class when publish the payload. So it is an > implementation details of this class yes. > >> 7. Why do we need "PayloadResponse"? Why can't we return a String or a URL >> object? > > Originally because I needed something bit of wrapper for re-try and skip > logic. But as I mentioned in first point we can make publish and download > methods return the ref or payload otherwise they throw PayloadStoreException > directly to simplify things WDYT? Other option have publish return only ref > as string and download would be the one that could throw > PayloadNotFoundException. > >> 8. Could we change the abstract class to interface? > We can once we agree on what config we will drop and move to the details of > payload provided by the users. > > regards > Omnia > >> On 19 May 2025, at 09:54, Luke Chen <show...@gmail.com> wrote: >> >> Hi Omnia, >> >> Thanks for the explanation and update. >> It's better now. >> >> Questions: >>> 2. It's not clear how the "retry count" comes into play in the KIP. It >>> needs more explanation. >> My initial thinking is the retry configs are a must for all blob stores, so >> we can provide them, and validate them for free for all blob stores so not >> every implementation will go through verifying them. >> >> IMO, the retry should be the logic inside the >> "large.message.payload.store.class" >> implementation. If we really want it, I think we need to make it clear in >> which circumstance we will retry. For example, if it's an unknown exception >> thrown from S3 API, what will we do to it? >> >>> 6. About the BlobIdGenerator, why do we need it, could you explain more? >>> Again, I thought we only need to replace value to a path, and add the >>> "large-message" header, so that when consumer reads this message, it'll >>> read the path from value and get the original data via BlobStore. Why do >> we >>> need this ID generator? I think users should do the object naming when >>> putting the object by themselves, not via another interface. WDYT? >> In some cases generating the ID might need some smart work for example to >> avoid s3 throttling the recommended way on their doc it to create sub paths >> under the original bucket, to decide this we might hash the data to find a >> suitable sub-path. >> Here is an example of how I would generate an path for s3 file >> ``` >> public String id(byte[] data) { >> String subFolder = topic + "-" + Utils.toPositive(Utils.murmur2(data)) % >> distributionFactor // distributionFactor is a config for the Id generator >> and it represent the max number of sub-folders under the bucket >> return subFolder + “/“ + UUID.randomUUID().toString() >> } >> ``` >> Hope this example clarify a bit. However I do agree here it might not need >> a class. I have move it to be part of the store class. >> >> Moving it into the store class makes it much clearer. And it's good to have >> a default implementation. >> But to me, it's still an implementation-level detail that we don't need to >> expose to users to implement it. >> Could I know more about when the Id generator will be invoked? >> My thought is : >> Users can implement a `publish` method to publish like this: >> public PayloadResponse publish(String topic, byte[] data) { >> String id = genId(); >> // put the id and data to the remote storage >> s3Client.put(id, data, ...); >> } >> >> So, with the id method in the interface, who will invoke it? Suppose it >> will be the serializer/deserializer, but no one passes the generated id to >> the publish method, how do we use it? >> >> 7. Why do we need "PayloadResponse"? Why can't we return a String or a URL >> object? >> >> 8. Could we change the abstract class to interface? >> >> Thanks. >> Luke >> >> >> >> >> On Wed, Apr 30, 2025 at 9:21 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com >> <mailto:o.g.h.ibra...@gmail.com>> >> wrote: >> >>> Hi Luke, >>> >>>>> 3. What does "LargeMessageFormatter" do in the process? >>>>> I thought all we want to do is to replace the "large value data" into a >>>>> path, and consumers will read the path via blob store class. >>>>> All these should be done in serializer/deserializer, so why do we need >>> the >>>>> formatter? >>>> >>>> I wanted to bit of more info than just the path to download, for example >>> I want to add stuff like the class path for the original blob store for >>> example if consumer is setup with the unmatched blob store to the one used >>> during publishing. >>>> I have updated the KIP to simplify this by having this always as a >>> simple json of path and publisher class which can be represented as >>> PayloadReferenceValue. WDYT? >>> >>> I thought of another case where having the freedom to form the reference >>> might be nice feature, which is DR. Let’s imagine this case where someone >>> publish large messages to S3 and reference to Kafka topic then they want to >>> have DR. This can be achievable if they have mirrored Kafka topic which >>> contains the references but if S3 is unreachable form the DR backup >>> location then the reference they have is bit useless. However if the >>> message formatter is customisable then dev can implement a complicated >>> store that publish to two store locations and the publish both references >>> to Kafka as one message and the consumer store can download from either >>> buckets that are available. I think keeping the door open to such use-case >>> might be good feature but also having such use case might be questionable a >>> bit with the latency it will add as we will be publishing to N number of >>> stores. >>> >>> Regards >>> Omnia >>> >>>> On 24 Apr 2025, at 17:40, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote: >>>> >>>> Hi Luke, thanks for having the time to look into the KIP >>>>> 2. It's not clear how the "retry count" comes into play in the KIP. It >>>>> needs more explanation. >>>> My initial thinking is the retry configs are a must for all blob stores, >>> so we can provide them, and validate them for free for all blob stores so >>> not every implementation will go through verifying them. >>>> >>>>> 3. What does "LargeMessageFormatter" do in the process? >>>>> I thought all we want to do is to replace the "large value data" into a >>>>> path, and consumers will read the path via blob store class. >>>>> All these should be done in serializer/deserializer, so why do we need >>> the >>>>> formatter? >>>> >>>> I wanted to bit of more info than just the path to download, for example >>> I want to add stuff like the class path for the original blob store for >>> example if consumer is setup with the unmatched blob store to the one used >>> during publishing. >>>> I have updated the KIP to simplify this by having this always as a >>> simple json of path and publisher class which can be represented as >>> PayloadReferenceValue. WDYT? >>>> >>>>> 4. In the BlobStore, it looks like we presume users will use object >>> stores, >>>>> which is not good. >>>>> Could we make it more generic? Javadoc, method names, … >>>> This is a good point, I have updated the method names and Javadoc. I >>> also thinking of renaming the class name to PayloadStoreinstead of >>> BlobStore as blob store still tide to object store as well. To set some >>> context here, I am proposing this after working with some community form >>> Apache Cassandra who are working on Cassandra CEP < >>> https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-44%3A+Kafka+integration+for+Cassandra+CDC+using+Sidecar#CEP44:KafkaintegrationforCassandraCDCusingSidecar-LargeBlob>-44 >>> to handle large CDC and the initial thinking was let’s publish any large >>> cdc to an object store instead of Kafka this why the naming was suggesting >>> “blob store” only. >>>> >>>>> 5. It would be good to have some explanation for the purpose of each new >>>>> interface/class, and clear javadoc for each method. >>>> >>>> Updated the KIP with javadocs >>>> >>>>> 6. About the BlobIdGenerator, why do we need it, could you explain more? >>>>> Again, I thought we only need to replace value to a path, and add the >>>>> "large-message" header, so that when consumer reads this message, it'll >>>>> read the path from value and get the original data via BlobStore. Why >>> do we >>>>> need this ID generator? I think users should do the object naming when >>>>> putting the object by themselves, not via another interface. WDYT? >>>> In some cases generating the ID might need some smart work for example >>> to avoid s3 throttling the recommended way on their doc it to create sub >>> paths under the original bucket, to decide this we might hash the data to >>> find a suitable sub-path. >>>> Here is an example of how I would generate an path for s3 file >>>> ``` >>>> public String id(byte[] data) { >>>> String subFolder = topic + "-" + >>> Utils.toPositive(Utils.murmur2(data)) % distributionFactor // >>> distributionFactor is a config for the Id generator and it represent the >>> max number of sub-folders under the bucket >>>> return subFolder + “/“ + UUID.randomUUID().toString() >>>> } >>>> ``` >>>> Hope this example clarify a bit. However I do agree here it might not >>> need a class. I have move it to be part of the store class. >>>> >>>> Please let me know WDYT of the final shape of the KIP now >>>> >>>> Thanks >>>> Omnia >>>> >>>>> On 24 Apr 2025, at 13:31, Luke Chen <show...@gmail.com> wrote: >>>>> >>>>> Hi Omnia, >>>>> >>>>> Thanks for proposing this feature that many users expected. >>>>> >>>>> Some comments: >>>>> 1. It's quite interesting to see the idea of chained >>>>> serializer/deserializer used here. I like it. >>>>> >>>>> 2. It's not clear how the "retry count" comes into play in the KIP. It >>>>> needs more explanation. >>>>> >>>>> 3. What does "LargeMessageFormatter" do in the process? >>>>> I thought all we want to do is to replace the "large value data" into a >>>>> path, and consumers will read the path via blob store class. >>>>> All these should be done in serializer/deserializer, so why do we need >>> the >>>>> formatter? >>>>> >>>>> 4. In the BlobStore, it looks like we presume users will use object >>> stores, >>>>> which is not good. >>>>> Could we make it more generic? Javadoc, method names, ... >>>>> >>>>> 5. It would be good to have some explanation for the purpose of each new >>>>> interface/class, and clear javadoc for each method. >>>>> >>>>> 6. About the BlobIdGenerator, why do we need it, could you explain more? >>>>> Again, I thought we only need to replace value to a path, and add the >>>>> "large-message" header, so that when consumer reads this message, it'll >>>>> read the path from value and get the original data via BlobStore. Why >>> do we >>>>> need this ID generator? I think users should do the object naming when >>>>> putting the object by themselves, not via another interface. WDYT? >>>>> >>>>> Thanks. >>>>> Luke >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Thu, Apr 10, 2025 at 9:31 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi there I would like to start discussions on >>>>>> >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1159%3A+Large+message+reference+based+Serializer >>>>>> >>>>>> Thanks >>>>>> Omnia >