Hi Apurva, Sorry for being late on this thread. I am trying to understand the implementation of case that we will throw DuplicateSequenceException. My understanding is the following: 1. On the broker side, we will cache 5 most recent sequence/timestamp/offset (STO) for each of the producer ID. 2. When duplicate occurs and the producer has max.in.flight.requests set to <=5, we can return the timestamp/offset from the cached STO. 3. When duplicate occurs and the producer has max.in.flight.requests greater than 5. We may need to return DuplicateSequenceException just to indicate the sequence is duplicate, but the ProduceResponse will not have timestamp/offset because it may be out of the 5 entries in the cache.
Is my understanding correct? If it is the current implementation, I have a few concerns: 1. One potential issue for this is that if there are a lot of producers producing to the same cluster (e.g. a Map-Reduce job) we may still spend a lot of memory on caching the most recent STO. 2. In most cases, we are essentially caching the sequnce/timestamp/offset entries that may never be used again. Since we are making protocol changes, I am wondering if we can improve the above two cases by doing the following: 1. Add a per partition LastAckedSequence field in the ProduceRequest. 2. The broker will remove the cached STO entries whose sequence is less than or equals to LastAckedSequence for each Partition/PID. 3. Have a global STO cache to cap the number of total cached STO entries to some number, say 1 million. This total number is shared by all the producers. If this number is reached, we will remove the entries from the producer who has the most cached STO entries. 4. If there is a sequence smaller than the last sequence and there is no entry in the STO cache, we return DuplicateSequenceException without offset/timestamp. With the above changes, we can have the following benefits: 1. avoid caching the sequence/timestamp/offset unnecessarily because all the cached entries are the entries that hasn't been confirmed by the producer. 2. no magic number of 5 max.in.flight.requests.per.connection 3. bounded memory footprint on the cached sequence/timestamp/offset entries. Hope it's not too late to have the changes if that makes sense. Thanks, Jiangjie (Becket) Qin On Mon, Sep 11, 2017 at 11:21 AM, Apurva Mehta <apu...@confluent.io> wrote: > Thanks for the votes everyone. > > One of the proposals here was to raise a 'DuplicateSequenceException' to > the user if the broker detected that one of the internal retries resulted > in the duplicate, and the metadata for the original batch was no longer > cached. > > However, when implementing this change, I realized that this is quite > unintuitive from the user's point of view. In reality, the 'duplicate' is > only due to internal retries -- something that the user has no visibility > into. And secondly, this is not an error: the batch has been persisted, > only the cached metadata has been lost. > > I think the better approach is to return the a 'success' but make it clear > that there is no record metadata. If the user tries to access > `RecordMetadata.offset` or `RecordMetadata.timestamp` methods of the > returned metadata, we can raise a 'NoMetadataAvailableException' or > something like that. > > This way users who don't access the 'offset' and 'timestamp' fields would > not notice a change. For the users who do, the offset and timestamp will > not silently be invalid: they will be notified through an exception. > > This seems like the cleanest way forward and I would like to make this > small change to the KIP. > > Does anybody have any objections? > > Thanks, > Apurva > > > > On Thu, Sep 7, 2017 at 9:44 PM, Apurva Mehta <apu...@confluent.io> wrote: > > > Thanks for the comments Ismael. > > > > I have gone ahead and incorporated all your suggestions in the KIP > > document. You convinced me on adding max.message.bytes :) > > > > Apurva > > > > On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <ism...@juma.me.uk> wrote: > > > >> Thanks for the KIP. +1 (binding) from me. A few minor comments: > >> > >> 1. We should add a note to the backwards compatibility section > explaining > >> the impact of throwing DuplicateSequenceException (a new exception) from > >> `send`. As I understand it, it's not an issue, but good to include it in > >> the KIP. > >> > >> 2. For clarity, it's good to highlight in some way the new fields in the > >> protocol definition itself > >> > >> 3. I understand that you decided not to add max.message.bytes because > it's > >> unrelated to this KIP. I'll try to persuade you that we should, but it's > >> not a blocker if you don't agree. The reasons are: 1. The implementation > >> effort to add it is minimal since it's a topic config like message > format > >> version, 2. It's clearly beneficial for the producer to have that > >> information, 3. It's compact (just a number), 4. It's nice to avoid > >> another > >> protocol bump for a small change like that. > >> > >> Thanks, > >> Ismael > >> > >> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <apu...@confluent.io> > wrote: > >> > >> > Hi, > >> > > >> > I'd like to start a vote for KIP-192: > >> > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled > >> > > >> > Thanks, > >> > Apurva > >> > > >> > > > > >