Hi everyone,
If there is no any more feedback need to be addressed then I will open voting 
on this KIP early next week.

Regards 
Omnia

> On 28 May 2025, at 11:13, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote:
> 
> Hi Luke, I had another round of updating the KIP. 
> I now made `PayloadStore::publish` returns a simplified version of 
> “PayloadResponse” which will now will simply have the "fullPayloadPath", and 
> “PayloadException”. 
> If PayloadResponse
> -  contains PayloadStoreException with `isRetriable` flag set to true then 
> this will trigger the retry logic 
> - If response code is 404 and large.message.skip.not.found.error then we will 
> skip failing on the download in the deserialiser side. 
> 
> WDYT? 
> 
> Regards 
> Omnia
> 
> 
>> On 22 May 2025, at 15:16, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote:
>> 
>> Hi Luke, sorry for late response. 
>>> IMO, the retry should be the logic inside the
>>> "large.message.payload.store.class"
>>> implementation. If we really want it, I think we need to make it clear in
>>> which circumstance we will retry. For example, if it's an unknown exception
>>> thrown from S3 API, what will we do to it?
>> There was one class definition missing which is PayloadStoreException that 
>> is used in PayloadResponse. The PayloadStoreException has flag if it is true 
>> then we retry. And then serialised retry on any exception with this flag. We 
>> can make publish and download methods throw  PayloadStoreException directly 
>> to simplify things WDYT?
>> 
>>> Moving it into the store class makes it much clearer. And it's good to have
>>> a default implementation.
>>> But to me, it's still an implementation-level detail that we don't need to
>>> expose to users to implement it.
>>> Could I know more about when the Id generator will be invoked?
>>> My thought is :
>>> Users can implement a `publish` method to publish like this:
>>> public PayloadResponse publish(String topic, byte[] data) {
>>>  String id = genId();
>>>  // put the id and data to the remote storage
>>>  s3Client.put(id, data, ...);
>>> }
>>> 
>>> So, with the id method in the interface, who will invoke it? Suppose it
>>> will be the serializer/deserializer, but no one passes the generated id to
>>> the publish method, how do we use it?
>> 
>> It will be invoked by the payload class when publish the payload. So it is 
>> an implementation details of this class yes. 
>> 
>>> 7. Why do we need "PayloadResponse"? Why can't we return a String or a URL
>>> object?
>> 
>> Originally because I needed something bit of wrapper for re-try and skip 
>> logic. But as I mentioned in first point we can make publish and download 
>> methods return the ref or payload otherwise they throw PayloadStoreException 
>> directly to simplify things WDYT? Other option have publish return only ref 
>> as string and download would be the one that could throw 
>> PayloadNotFoundException. 
>> 
>>> 8. Could we change the abstract class to interface?
>> We can once we agree on what config we will drop and move to the details of 
>> payload provided by the users. 
>> 
>> regards
>> Omnia
>> 
>>> On 19 May 2025, at 09:54, Luke Chen <show...@gmail.com> wrote:
>>> 
>>> Hi Omnia,
>>> 
>>> Thanks for the explanation and update.
>>> It's better now.
>>> 
>>> Questions:
>>>> 2. It's not clear how the "retry count" comes into play in the KIP. It
>>>> needs more explanation.
>>> My initial thinking is the retry configs are a must for all blob stores, so
>>> we can provide them, and validate them for free for all blob stores so not
>>> every implementation will go through verifying them.
>>> 
>>> IMO, the retry should be the logic inside the
>>> "large.message.payload.store.class"
>>> implementation. If we really want it, I think we need to make it clear in
>>> which circumstance we will retry. For example, if it's an unknown exception
>>> thrown from S3 API, what will we do to it?
>>> 
>>>> 6. About the BlobIdGenerator, why do we need it, could you explain more?
>>>> Again, I thought we only need to replace value to a path, and add the
>>>> "large-message" header, so that when consumer reads this message, it'll
>>>> read the path from value and get the original data via BlobStore. Why do
>>> we
>>>> need this ID generator? I think users should do the object naming when
>>>> putting the object by themselves, not via another interface. WDYT?
>>> In some cases generating the ID might need some smart work for example to
>>> avoid s3 throttling the recommended way on their doc it to create sub paths
>>> under the original bucket, to decide this we might hash the data to find a
>>> suitable sub-path.
>>> Here is an example of how I would generate an path for s3 file
>>> ```
>>> public String id(byte[] data) {
>>>   String subFolder = topic + "-" + Utils.toPositive(Utils.murmur2(data)) %
>>> distributionFactor // distributionFactor is a config for the Id generator
>>> and it represent the max number of sub-folders under the bucket
>>>   return subFolder + “/“ + UUID.randomUUID().toString()
>>> }
>>> ```
>>> Hope this example clarify a bit. However I do agree here it might not need
>>> a class. I have move it to be part of the store class.
>>> 
>>> Moving it into the store class makes it much clearer. And it's good to have
>>> a default implementation.
>>> But to me, it's still an implementation-level detail that we don't need to
>>> expose to users to implement it.
>>> Could I know more about when the Id generator will be invoked?
>>> My thought is :
>>> Users can implement a `publish` method to publish like this:
>>> public PayloadResponse publish(String topic, byte[] data) {
>>>  String id = genId();
>>>  // put the id and data to the remote storage
>>>  s3Client.put(id, data, ...);
>>> }
>>> 
>>> So, with the id method in the interface, who will invoke it? Suppose it
>>> will be the serializer/deserializer, but no one passes the generated id to
>>> the publish method, how do we use it?
>>> 
>>> 7. Why do we need "PayloadResponse"? Why can't we return a String or a URL
>>> object?
>>> 
>>> 8. Could we change the abstract class to interface?
>>> 
>>> Thanks.
>>> Luke
>>> 
>>> 
>>> 
>>> 
>>> On Wed, Apr 30, 2025 at 9:21 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com 
>>> <mailto:o.g.h.ibra...@gmail.com>>
>>> wrote:
>>> 
>>>> Hi Luke,
>>>> 
>>>>>> 3. What does "LargeMessageFormatter" do in the process?
>>>>>> I thought all we want to do is to replace the "large value data" into a
>>>>>> path, and consumers will read the path via blob store class.
>>>>>> All these should be done in serializer/deserializer, so why do we need
>>>> the
>>>>>> formatter?
>>>>> 
>>>>> I wanted to bit of more info than just the path to download, for example
>>>> I want to add stuff like the class path for the original blob store for
>>>> example if consumer is setup with the unmatched blob store to the one used
>>>> during publishing.
>>>>> I have updated the KIP to simplify this by having this always as a
>>>> simple json of path and publisher class which can be represented as
>>>> PayloadReferenceValue. WDYT?
>>>> 
>>>> I thought of another case where having the freedom to form the reference
>>>> might be nice feature, which is DR. Let’s imagine this case where someone
>>>> publish large messages to S3 and reference to Kafka topic then they want to
>>>> have DR. This can be achievable if they have mirrored Kafka topic which
>>>> contains the references but if S3 is unreachable form the DR backup
>>>> location then the reference they have is bit useless. However if the
>>>> message formatter is customisable then dev can implement a complicated
>>>> store that publish to two store locations and the publish both references
>>>> to Kafka as one message and the consumer store can download from either
>>>> buckets that are available. I think keeping the door open to such use-case
>>>> might be good feature but also having such use case might be questionable a
>>>> bit with the latency it will add as we will be publishing to N number of
>>>> stores.
>>>> 
>>>> Regards
>>>> Omnia
>>>> 
>>>>> On 24 Apr 2025, at 17:40, Omnia Ibrahim <o.g.h.ibra...@gmail.com> wrote:
>>>>> 
>>>>> Hi Luke, thanks for having the time to look into the KIP
>>>>>> 2. It's not clear how the "retry count" comes into play in the KIP. It
>>>>>> needs more explanation.
>>>>> My initial thinking is the retry configs are a must for all blob stores,
>>>> so we can provide them, and validate them for free for all blob stores so
>>>> not every implementation will go through verifying them.
>>>>> 
>>>>>> 3. What does "LargeMessageFormatter" do in the process?
>>>>>> I thought all we want to do is to replace the "large value data" into a
>>>>>> path, and consumers will read the path via blob store class.
>>>>>> All these should be done in serializer/deserializer, so why do we need
>>>> the
>>>>>> formatter?
>>>>> 
>>>>> I wanted to bit of more info than just the path to download, for example
>>>> I want to add stuff like the class path for the original blob store for
>>>> example if consumer is setup with the unmatched blob store to the one used
>>>> during publishing.
>>>>> I have updated the KIP to simplify this by having this always as a
>>>> simple json of path and publisher class which can be represented as
>>>> PayloadReferenceValue. WDYT?
>>>>> 
>>>>>> 4. In the BlobStore, it looks like we presume users will use object
>>>> stores,
>>>>>> which is not good.
>>>>>> Could we make it more generic? Javadoc, method names, …
>>>>> This is a good point, I have updated the method names and Javadoc. I
>>>> also thinking of renaming the class name to PayloadStoreinstead of
>>>> BlobStore as blob store still tide to object store as well. To set some
>>>> context here, I am proposing this after working with some community form
>>>> Apache Cassandra who are working on Cassandra CEP <
>>>> https://cwiki.apache.org/confluence/display/CASSANDRA/CEP-44%3A+Kafka+integration+for+Cassandra+CDC+using+Sidecar#CEP44:KafkaintegrationforCassandraCDCusingSidecar-LargeBlob>-44
>>>> to handle large CDC and the initial thinking was let’s publish any large
>>>> cdc to an object store instead of Kafka this why the naming was suggesting
>>>> “blob store” only.
>>>>> 
>>>>>> 5. It would be good to have some explanation for the purpose of each new
>>>>>> interface/class, and clear javadoc for each method.
>>>>> 
>>>>> Updated the KIP with javadocs
>>>>> 
>>>>>> 6. About the BlobIdGenerator, why do we need it, could you explain more?
>>>>>> Again, I thought we only need to replace value to a path, and add the
>>>>>> "large-message" header, so that when consumer reads this message, it'll
>>>>>> read the path from value and get the original data via BlobStore. Why
>>>> do we
>>>>>> need this ID generator? I think users should do the object naming when
>>>>>> putting the object by themselves, not via another interface. WDYT?
>>>>> In some cases generating the ID might need some smart work for example
>>>> to avoid s3 throttling the recommended way on their doc it to create sub
>>>> paths under the original bucket, to decide this we might hash the data to
>>>> find a suitable sub-path.
>>>>> Here is an example of how I would generate an path for s3 file
>>>>> ```
>>>>> public String id(byte[] data) {
>>>>>   String subFolder = topic + "-" +
>>>> Utils.toPositive(Utils.murmur2(data)) % distributionFactor //
>>>> distributionFactor is a config for the Id generator and it represent the
>>>> max number of sub-folders under the bucket
>>>>>   return subFolder + “/“ + UUID.randomUUID().toString()
>>>>> }
>>>>> ```
>>>>> Hope this example clarify a bit. However I do agree here it might not
>>>> need a class. I have move it to be part of the store class.
>>>>> 
>>>>> Please let me know WDYT of the final shape of the KIP now
>>>>> 
>>>>> Thanks
>>>>> Omnia
>>>>> 
>>>>>> On 24 Apr 2025, at 13:31, Luke Chen <show...@gmail.com> wrote:
>>>>>> 
>>>>>> Hi Omnia,
>>>>>> 
>>>>>> Thanks for proposing this feature that many users expected.
>>>>>> 
>>>>>> Some comments:
>>>>>> 1. It's quite interesting to see the idea of chained
>>>>>> serializer/deserializer used here. I like it.
>>>>>> 
>>>>>> 2. It's not clear how the "retry count" comes into play in the KIP. It
>>>>>> needs more explanation.
>>>>>> 
>>>>>> 3. What does "LargeMessageFormatter" do in the process?
>>>>>> I thought all we want to do is to replace the "large value data" into a
>>>>>> path, and consumers will read the path via blob store class.
>>>>>> All these should be done in serializer/deserializer, so why do we need
>>>> the
>>>>>> formatter?
>>>>>> 
>>>>>> 4. In the BlobStore, it looks like we presume users will use object
>>>> stores,
>>>>>> which is not good.
>>>>>> Could we make it more generic? Javadoc, method names, ...
>>>>>> 
>>>>>> 5. It would be good to have some explanation for the purpose of each new
>>>>>> interface/class, and clear javadoc for each method.
>>>>>> 
>>>>>> 6. About the BlobIdGenerator, why do we need it, could you explain more?
>>>>>> Again, I thought we only need to replace value to a path, and add the
>>>>>> "large-message" header, so that when consumer reads this message, it'll
>>>>>> read the path from value and get the original data via BlobStore. Why
>>>> do we
>>>>>> need this ID generator? I think users should do the object naming when
>>>>>> putting the object by themselves, not via another interface. WDYT?
>>>>>> 
>>>>>> Thanks.
>>>>>> Luke
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Thu, Apr 10, 2025 at 9:31 PM Omnia Ibrahim <o.g.h.ibra...@gmail.com>
>>>>>> wrote:
>>>>>> 
>>>>>>> Hi there I would like to start discussions on
>>>>>>> 
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-1159%3A+Large+message+reference+based+Serializer
>>>>>>> 
>>>>>>> Thanks
>>>>>>> Omnia
>> 
> 

Reply via email to