Hi Becket, Regarding the current implementation: we opted for a simpler server side implementation where we _don't_ snapshot the metadata of the last 5 batches to disk. So if a broker fails, comes back online, and is the leader again, it will only have the last batch in memory. With max.in.flight = 5, it is thus possible to receive duplicates of 4 prior batches and yet not have the metadata.
Since we can return DuplicateSequence to the client, and since this is considered a successful response on the client, this is a good solution. It also means that we no longer have a hard limit of max.in.flight == 5: if you use a larger value, you are more likely to receive responses without the offset/timestamp metadata. Your suggestion for sending the lastAckdSequence per partition would help reduce memory on the broker, but it won't bound it: we need at least one cached batch per producer to do sequence number validation so it will always grow proportional to the number of active producers. Nor does it uniquely solve the problem of removing the cap on max.in.flight: producers are still not guaranteed that the metadata for all their inflight batches will always be cached and returned. So it doesn't strengthen any semantics, but does optimize memory usage on the broker. But what's the usage with the proposed changes? With 5 cached batches, each producerIdEntry will be 142 bytes, or 7000 active producers who use idempotence will take 1MB per broker. With 1 cached batch, we save at most 96 bytes per producer, so we could have at most 21000 active producers using idempotence per MB of broker memory. The savings are significant, but I am not convinced optimizing this is worth it right now since we are not making the idempotent producer the default in this release. Further there are a bunch of other items which make exactly once semantics more usable which we are working on in this release. Given that there are only 8 days left till feature freeze, I would rather tackle the usability issues (KIP-192) than make these memory optimizations on the broker. The payoff from the latter seems much smaller, and it is always possible to do in the future. What does the rest of the community think? Thanks, Apurva On Mon, Sep 11, 2017 at 2:39 PM, Becket Qin <becket....@gmail.com> wrote: > Hi Apurva, > > Sorry for being late on this thread. I am trying to understand the > implementation of case that we will throw DuplicateSequenceException. My > understanding is the following: > 1. On the broker side, we will cache 5 most recent > sequence/timestamp/offset (STO) for each of the producer ID. > 2. When duplicate occurs and the producer has max.in.flight.requests set to > <=5, we can return the timestamp/offset from the cached STO. > 3. When duplicate occurs and the producer has max.in.flight.requests > greater than 5. We may need to return DuplicateSequenceException just to > indicate the sequence is duplicate, but the ProduceResponse will not have > timestamp/offset because it may be out of the 5 entries in the cache. > > Is my understanding correct? If it is the current implementation, I have a > few concerns: > 1. One potential issue for this is that if there are a lot of producers > producing to the same cluster (e.g. a Map-Reduce job) we may still spend a > lot of memory on caching the most recent STO. > 2. In most cases, we are essentially caching the sequnce/timestamp/offset > entries that may never be used again. > > Since we are making protocol changes, I am wondering if we can improve the > above two cases by doing the following: > 1. Add a per partition LastAckedSequence field in the ProduceRequest. > 2. The broker will remove the cached STO entries whose sequence is less > than or equals to LastAckedSequence for each Partition/PID. > 3. Have a global STO cache to cap the number of total cached STO entries to > some number, say 1 million. This total number is shared by all the > producers. If this number is reached, we will remove the entries from the > producer who has the most cached STO entries. > 4. If there is a sequence smaller than the last sequence and there is no > entry in the STO cache, we return DuplicateSequenceException without > offset/timestamp. > > With the above changes, we can have the following benefits: > 1. avoid caching the sequence/timestamp/offset unnecessarily because all > the cached entries are the entries that hasn't been confirmed by the > producer. > 2. no magic number of 5 max.in.flight.requests.per.connection > 3. bounded memory footprint on the cached sequence/timestamp/offset > entries. > > Hope it's not too late to have the changes if that makes sense. > > Thanks, > > Jiangjie (Becket) Qin > > > On Mon, Sep 11, 2017 at 11:21 AM, Apurva Mehta <apu...@confluent.io> > wrote: > > > Thanks for the votes everyone. > > > > One of the proposals here was to raise a 'DuplicateSequenceException' to > > the user if the broker detected that one of the internal retries resulted > > in the duplicate, and the metadata for the original batch was no longer > > cached. > > > > However, when implementing this change, I realized that this is quite > > unintuitive from the user's point of view. In reality, the 'duplicate' is > > only due to internal retries -- something that the user has no visibility > > into. And secondly, this is not an error: the batch has been persisted, > > only the cached metadata has been lost. > > > > I think the better approach is to return the a 'success' but make it > clear > > that there is no record metadata. If the user tries to access > > `RecordMetadata.offset` or `RecordMetadata.timestamp` methods of the > > returned metadata, we can raise a 'NoMetadataAvailableException' or > > something like that. > > > > This way users who don't access the 'offset' and 'timestamp' fields would > > not notice a change. For the users who do, the offset and timestamp will > > not silently be invalid: they will be notified through an exception. > > > > This seems like the cleanest way forward and I would like to make this > > small change to the KIP. > > > > Does anybody have any objections? > > > > Thanks, > > Apurva > > > > > > > > On Thu, Sep 7, 2017 at 9:44 PM, Apurva Mehta <apu...@confluent.io> > wrote: > > > > > Thanks for the comments Ismael. > > > > > > I have gone ahead and incorporated all your suggestions in the KIP > > > document. You convinced me on adding max.message.bytes :) > > > > > > Apurva > > > > > > On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <ism...@juma.me.uk> wrote: > > > > > >> Thanks for the KIP. +1 (binding) from me. A few minor comments: > > >> > > >> 1. We should add a note to the backwards compatibility section > > explaining > > >> the impact of throwing DuplicateSequenceException (a new exception) > from > > >> `send`. As I understand it, it's not an issue, but good to include it > in > > >> the KIP. > > >> > > >> 2. For clarity, it's good to highlight in some way the new fields in > the > > >> protocol definition itself > > >> > > >> 3. I understand that you decided not to add max.message.bytes because > > it's > > >> unrelated to this KIP. I'll try to persuade you that we should, but > it's > > >> not a blocker if you don't agree. The reasons are: 1. The > implementation > > >> effort to add it is minimal since it's a topic config like message > > format > > >> version, 2. It's clearly beneficial for the producer to have that > > >> information, 3. It's compact (just a number), 4. It's nice to avoid > > >> another > > >> protocol bump for a small change like that. > > >> > > >> Thanks, > > >> Ismael > > >> > > >> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <apu...@confluent.io> > > wrote: > > >> > > >> > Hi, > > >> > > > >> > I'd like to start a vote for KIP-192: > > >> > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP- > > >> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled > > >> > > > >> > Thanks, > > >> > Apurva > > >> > > > >> > > > > > > > > >