The KIP has passed with three binding +1 votes (Guozhang, Ismael, Jason) and no -1 or +0 votes.
Thanks to everyone for the feedback. Apurva On Mon, Sep 11, 2017 at 8:31 PM, Apurva Mehta <apu...@confluent.io> wrote: > Hi Becket, > > You are right: the calculations are per partition produced to by each > idempotent producer. I actually think this makes the problem more acute > when we actually end up enabling the idempotent producer by default. > However, even the most optimized version will still result in an overhead > of at least 46 bytes per (producer, broker, partition) triplet. > > I will call this out in KIP-185. I think we would want to optimize the > memory utilization of each (partition, broker, producer) triplet before > turning on the default. Another option could be to default max.in.flight to > 2 and retain the metadata of just 2 batches. This would significantly > reduce the memory overhead in the default distribution, and in the most > common cases would not result in produce responses without the record > metadata. That may be a simple way to address the issue. > Thanks, > Apurva > > On Mon, Sep 11, 2017 at 7:40 PM, Becket Qin <becket....@gmail.com> wrote: > >> Hi Apurva, >> >> Thanks for the explanation. >> >> I think the STO will be per producer/partition, right? Am I missing >> something? You are right that the proposal does not strengthen the >> semantic. The goal is more about trying to bound the memory consumption to >> some reasonable number and use the memory more efficiently. >> >> But I agree it is not a blocker for this KIP as the memory pressure may >> not >> be that big. With 5K partitions per broker, 50 producers per partition on >> average, we are going to consume about 35 MB of memory with 5 entries (142 >> bytes) in cache. So it is probably still OK and it is more urgent to fix >> the upgrade path. >> >> Thanks, >> >> Jiangjie (Becket) Qin >> >> >> >> >> >> On Mon, Sep 11, 2017 at 4:13 PM, Apurva Mehta <apu...@confluent.io> >> wrote: >> >> > Hi Becket, >> > >> > Regarding the current implementation: we opted for a simpler server side >> > implementation where we _don't_ snapshot the metadata of the last 5 >> batches >> > to disk. So if a broker fails, comes back online, and is the leader >> again, >> > it will only have the last batch in memory. With max.in.flight = 5, it >> is >> > thus possible to receive duplicates of 4 prior batches and yet not have >> the >> > metadata. >> > >> > Since we can return DuplicateSequence to the client, and since this is >> > considered a successful response on the client, this is a good >> solution. It >> > also means that we no longer have a hard limit of max.in.flight == 5: if >> > you use a larger value, you are more likely to receive responses without >> > the offset/timestamp metadata. >> > >> > Your suggestion for sending the lastAckdSequence per partition would >> help >> > reduce memory on the broker, but it won't bound it: we need at least one >> > cached batch per producer to do sequence number validation so it will >> > always grow proportional to the number of active producers. Nor does it >> > uniquely solve the problem of removing the cap on max.in.flight: >> producers >> > are still not guaranteed that the metadata for all their inflight >> batches >> > will always be cached and returned. >> > >> > So it doesn't strengthen any semantics, but does optimize memory usage >> on >> > the broker. But what's the usage with the proposed changes? With 5 >> cached >> > batches, each producerIdEntry will be 142 bytes, or 7000 active >> producers >> > who use idempotence will take 1MB per broker. With 1 cached batch, we >> save >> > at most 96 bytes per producer, so we could have at most 21000 active >> > producers using idempotence per MB of broker memory. >> > >> > The savings are significant, but I am not convinced optimizing this is >> > worth it right now since we are not making the idempotent producer the >> > default in this release. Further there are a bunch of other items which >> > make exactly once semantics more usable which we are working on in this >> > release. Given that there are only 8 days left till feature freeze, I >> would >> > rather tackle the usability issues (KIP-192) than make these memory >> > optimizations on the broker. The payoff from the latter seems much >> smaller, >> > and it is always possible to do in the future. >> > >> > What does the rest of the community think? >> > >> > Thanks, >> > Apurva >> > >> > On Mon, Sep 11, 2017 at 2:39 PM, Becket Qin <becket....@gmail.com> >> wrote: >> > >> > > Hi Apurva, >> > > >> > > Sorry for being late on this thread. I am trying to understand the >> > > implementation of case that we will throw DuplicateSequenceException. >> My >> > > understanding is the following: >> > > 1. On the broker side, we will cache 5 most recent >> > > sequence/timestamp/offset (STO) for each of the producer ID. >> > > 2. When duplicate occurs and the producer has max.in.flight.requests >> set >> > to >> > > <=5, we can return the timestamp/offset from the cached STO. >> > > 3. When duplicate occurs and the producer has max.in.flight.requests >> > > greater than 5. We may need to return DuplicateSequenceException just >> to >> > > indicate the sequence is duplicate, but the ProduceResponse will not >> have >> > > timestamp/offset because it may be out of the 5 entries in the cache. >> > > >> > > Is my understanding correct? If it is the current implementation, I >> have >> > a >> > > few concerns: >> > > 1. One potential issue for this is that if there are a lot of >> producers >> > > producing to the same cluster (e.g. a Map-Reduce job) we may still >> spend >> > a >> > > lot of memory on caching the most recent STO. >> > > 2. In most cases, we are essentially caching the >> sequnce/timestamp/offset >> > > entries that may never be used again. >> > > >> > > Since we are making protocol changes, I am wondering if we can improve >> > the >> > > above two cases by doing the following: >> > > 1. Add a per partition LastAckedSequence field in the ProduceRequest. >> > > 2. The broker will remove the cached STO entries whose sequence is >> less >> > > than or equals to LastAckedSequence for each Partition/PID. >> > > 3. Have a global STO cache to cap the number of total cached STO >> entries >> > to >> > > some number, say 1 million. This total number is shared by all the >> > > producers. If this number is reached, we will remove the entries from >> the >> > > producer who has the most cached STO entries. >> > > 4. If there is a sequence smaller than the last sequence and there is >> no >> > > entry in the STO cache, we return DuplicateSequenceException without >> > > offset/timestamp. >> > > >> > > With the above changes, we can have the following benefits: >> > > 1. avoid caching the sequence/timestamp/offset unnecessarily because >> all >> > > the cached entries are the entries that hasn't been confirmed by the >> > > producer. >> > > 2. no magic number of 5 max.in.flight.requests.per.connection >> > > 3. bounded memory footprint on the cached sequence/timestamp/offset >> > > entries. >> > > >> > > Hope it's not too late to have the changes if that makes sense. >> > > >> > > Thanks, >> > > >> > > Jiangjie (Becket) Qin >> > > >> > > >> > > On Mon, Sep 11, 2017 at 11:21 AM, Apurva Mehta <apu...@confluent.io> >> > > wrote: >> > > >> > > > Thanks for the votes everyone. >> > > > >> > > > One of the proposals here was to raise a >> 'DuplicateSequenceException' >> > to >> > > > the user if the broker detected that one of the internal retries >> > resulted >> > > > in the duplicate, and the metadata for the original batch was no >> longer >> > > > cached. >> > > > >> > > > However, when implementing this change, I realized that this is >> quite >> > > > unintuitive from the user's point of view. In reality, the >> 'duplicate' >> > is >> > > > only due to internal retries -- something that the user has no >> > visibility >> > > > into. And secondly, this is not an error: the batch has been >> persisted, >> > > > only the cached metadata has been lost. >> > > > >> > > > I think the better approach is to return the a 'success' but make it >> > > clear >> > > > that there is no record metadata. If the user tries to access >> > > > `RecordMetadata.offset` or `RecordMetadata.timestamp` methods of the >> > > > returned metadata, we can raise a 'NoMetadataAvailableException' or >> > > > something like that. >> > > > >> > > > This way users who don't access the 'offset' and 'timestamp' fields >> > would >> > > > not notice a change. For the users who do, the offset and timestamp >> > will >> > > > not silently be invalid: they will be notified through an exception. >> > > > >> > > > This seems like the cleanest way forward and I would like to make >> this >> > > > small change to the KIP. >> > > > >> > > > Does anybody have any objections? >> > > > >> > > > Thanks, >> > > > Apurva >> > > > >> > > > >> > > > >> > > > On Thu, Sep 7, 2017 at 9:44 PM, Apurva Mehta <apu...@confluent.io> >> > > wrote: >> > > > >> > > > > Thanks for the comments Ismael. >> > > > > >> > > > > I have gone ahead and incorporated all your suggestions in the KIP >> > > > > document. You convinced me on adding max.message.bytes :) >> > > > > >> > > > > Apurva >> > > > > >> > > > > On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <ism...@juma.me.uk> >> > wrote: >> > > > > >> > > > >> Thanks for the KIP. +1 (binding) from me. A few minor comments: >> > > > >> >> > > > >> 1. We should add a note to the backwards compatibility section >> > > > explaining >> > > > >> the impact of throwing DuplicateSequenceException (a new >> exception) >> > > from >> > > > >> `send`. As I understand it, it's not an issue, but good to >> include >> > it >> > > in >> > > > >> the KIP. >> > > > >> >> > > > >> 2. For clarity, it's good to highlight in some way the new >> fields in >> > > the >> > > > >> protocol definition itself >> > > > >> >> > > > >> 3. I understand that you decided not to add max.message.bytes >> > because >> > > > it's >> > > > >> unrelated to this KIP. I'll try to persuade you that we should, >> but >> > > it's >> > > > >> not a blocker if you don't agree. The reasons are: 1. The >> > > implementation >> > > > >> effort to add it is minimal since it's a topic config like >> message >> > > > format >> > > > >> version, 2. It's clearly beneficial for the producer to have that >> > > > >> information, 3. It's compact (just a number), 4. It's nice to >> avoid >> > > > >> another >> > > > >> protocol bump for a small change like that. >> > > > >> >> > > > >> Thanks, >> > > > >> Ismael >> > > > >> >> > > > >> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta < >> apu...@confluent.io> >> > > > wrote: >> > > > >> >> > > > >> > Hi, >> > > > >> > >> > > > >> > I'd like to start a vote for KIP-192: >> > > > >> > >> > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP- >> > > > >> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled >> > > > >> > >> > > > >> > Thanks, >> > > > >> > Apurva >> > > > >> > >> > > > >> >> > > > > >> > > > > >> > > > >> > > >> > >> > >