The KIP has passed with three binding +1 votes (Guozhang, Ismael, Jason)
and no -1 or +0 votes.

Thanks to everyone for the feedback.

Apurva

On Mon, Sep 11, 2017 at 8:31 PM, Apurva Mehta <apu...@confluent.io> wrote:

> Hi Becket,
>
> You are right: the calculations are per partition produced to by each
> idempotent producer. I actually think this makes the problem more acute
> when we actually end up enabling the idempotent producer by default.
> However, even the most optimized version will still result in an overhead
> of at least 46 bytes per (producer, broker, partition) triplet.
>
> I will call this out in KIP-185. I think we would want to optimize the
> memory utilization of each (partition, broker, producer) triplet before
> turning on the default. Another option could be to default max.in.flight to
> 2 and retain the metadata of just 2 batches. This would significantly
> reduce the memory overhead in the default distribution, and in the most
> common cases would not result in produce responses without the record
> metadata. That may be a simple way to address the issue.
> Thanks,
> Apurva
>
> On Mon, Sep 11, 2017 at 7:40 PM, Becket Qin <becket....@gmail.com> wrote:
>
>> Hi Apurva,
>>
>> Thanks for the explanation.
>>
>> I think the STO will be per producer/partition, right? Am I missing
>> something? You are right that the proposal does not strengthen the
>> semantic. The goal is more about trying to bound the memory consumption to
>> some reasonable number and use the memory more efficiently.
>>
>> But I agree it is not a blocker for this KIP as the memory pressure may
>> not
>> be that big. With 5K partitions per broker, 50 producers per partition on
>> average, we are going to consume about 35 MB of memory with 5 entries (142
>> bytes) in cache. So it is probably still OK and it is more urgent to fix
>> the upgrade path.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>>
>>
>> On Mon, Sep 11, 2017 at 4:13 PM, Apurva Mehta <apu...@confluent.io>
>> wrote:
>>
>> > Hi Becket,
>> >
>> > Regarding the current implementation: we opted for a simpler server side
>> > implementation where we _don't_ snapshot the metadata of the last 5
>> batches
>> > to disk. So if a broker fails, comes back online, and is the leader
>> again,
>> > it will only have the last batch in memory. With max.in.flight = 5, it
>> is
>> > thus possible to receive duplicates of 4 prior batches and yet not have
>> the
>> > metadata.
>> >
>> > Since we can return DuplicateSequence to the client, and since this is
>> > considered a successful response on the client, this is a good
>> solution. It
>> > also means that we no longer have a hard limit of max.in.flight == 5: if
>> > you use a larger value, you are more likely to receive responses without
>> > the offset/timestamp metadata.
>> >
>> > Your suggestion for sending the lastAckdSequence per partition would
>> help
>> > reduce memory on the broker, but it won't bound it: we need at least one
>> > cached batch per producer to do sequence number validation so it will
>> > always grow proportional to the number of active producers. Nor does it
>> > uniquely solve the problem of removing the cap on max.in.flight:
>> producers
>> > are still not guaranteed that the metadata for all their inflight
>> batches
>> > will always be cached and returned.
>> >
>> > So it doesn't strengthen any semantics, but does optimize memory usage
>> on
>> > the broker. But what's the usage with the proposed changes? With 5
>> cached
>> > batches, each producerIdEntry will be 142 bytes, or 7000 active
>> producers
>> > who use idempotence will take 1MB per broker. With 1 cached batch, we
>> save
>> > at most 96 bytes per producer, so we could have at most 21000 active
>> > producers using idempotence per MB of broker memory.
>> >
>> > The savings are significant, but I am not convinced optimizing this is
>> > worth it right now since we are not making the idempotent producer the
>> > default in this release. Further there are a bunch of other items which
>> > make exactly once semantics more usable which we are working on in this
>> > release. Given that there are only 8 days left till feature freeze, I
>> would
>> > rather tackle the usability issues (KIP-192) than make these memory
>> > optimizations on the broker. The payoff from the latter seems much
>> smaller,
>> > and it is always possible to do in the future.
>> >
>> > What does the rest of the community think?
>> >
>> > Thanks,
>> > Apurva
>> >
>> > On Mon, Sep 11, 2017 at 2:39 PM, Becket Qin <becket....@gmail.com>
>> wrote:
>> >
>> > > Hi Apurva,
>> > >
>> > > Sorry for being late on this thread. I am trying to understand the
>> > > implementation of case that we will throw DuplicateSequenceException.
>> My
>> > > understanding is the following:
>> > > 1. On the broker side, we will cache 5 most recent
>> > > sequence/timestamp/offset (STO) for each of the producer ID.
>> > > 2. When duplicate occurs and the producer has max.in.flight.requests
>> set
>> > to
>> > > <=5, we can return the timestamp/offset from the cached STO.
>> > > 3. When duplicate occurs and the producer has max.in.flight.requests
>> > > greater than 5. We may need to return DuplicateSequenceException just
>> to
>> > > indicate the sequence is duplicate, but the ProduceResponse will not
>> have
>> > > timestamp/offset because it may be out of the 5 entries in the cache.
>> > >
>> > > Is my understanding correct? If it is the current implementation, I
>> have
>> > a
>> > > few concerns:
>> > > 1. One potential issue for this is that if there are a lot of
>> producers
>> > > producing to the same cluster (e.g. a Map-Reduce job) we may still
>> spend
>> > a
>> > > lot of memory on caching the most recent STO.
>> > > 2. In most cases, we are essentially caching the
>> sequnce/timestamp/offset
>> > > entries that may never be used again.
>> > >
>> > > Since we are making protocol changes, I am wondering if we can improve
>> > the
>> > > above two cases by doing the following:
>> > > 1. Add a per partition LastAckedSequence field in the ProduceRequest.
>> > > 2. The broker will remove the cached STO entries whose sequence is
>> less
>> > > than or equals to LastAckedSequence for each Partition/PID.
>> > > 3. Have a global STO cache to cap the number of total cached STO
>> entries
>> > to
>> > > some number, say 1 million. This total number is shared by all the
>> > > producers. If this number is reached, we will remove the entries from
>> the
>> > > producer who has the most cached STO entries.
>> > > 4. If there is a sequence smaller than the last sequence and there is
>> no
>> > > entry in the STO cache, we return DuplicateSequenceException without
>> > > offset/timestamp.
>> > >
>> > > With the above changes, we can have the following benefits:
>> > > 1. avoid caching the sequence/timestamp/offset unnecessarily because
>> all
>> > > the cached entries are the entries that hasn't been confirmed by the
>> > > producer.
>> > > 2. no magic number of 5 max.in.flight.requests.per.connection
>> > > 3. bounded memory footprint on the cached sequence/timestamp/offset
>> > > entries.
>> > >
>> > > Hope it's not too late to have the changes if that makes sense.
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > >
>> > > On Mon, Sep 11, 2017 at 11:21 AM, Apurva Mehta <apu...@confluent.io>
>> > > wrote:
>> > >
>> > > > Thanks for the votes everyone.
>> > > >
>> > > > One of the proposals here was to raise a
>> 'DuplicateSequenceException'
>> > to
>> > > > the user if the broker detected that one of the internal retries
>> > resulted
>> > > > in the duplicate, and the metadata for the original batch was no
>> longer
>> > > > cached.
>> > > >
>> > > > However, when implementing this change, I realized that this is
>> quite
>> > > > unintuitive from the user's point of view. In reality, the
>> 'duplicate'
>> > is
>> > > > only due to internal retries -- something that the user has no
>> > visibility
>> > > > into. And secondly, this is not an error: the batch has been
>> persisted,
>> > > > only the cached metadata has been lost.
>> > > >
>> > > > I think the better approach is to return the a 'success' but make it
>> > > clear
>> > > > that there is no record metadata. If the user tries to access
>> > > > `RecordMetadata.offset` or `RecordMetadata.timestamp` methods of the
>> > > > returned metadata, we can raise a 'NoMetadataAvailableException' or
>> > > > something like that.
>> > > >
>> > > > This way users who don't access the 'offset' and 'timestamp' fields
>> > would
>> > > > not notice a change. For the users who do, the offset and timestamp
>> > will
>> > > > not silently be invalid: they will be notified through an exception.
>> > > >
>> > > > This seems like the cleanest way forward and I would like to make
>> this
>> > > > small change to the KIP.
>> > > >
>> > > > Does anybody have any objections?
>> > > >
>> > > > Thanks,
>> > > > Apurva
>> > > >
>> > > >
>> > > >
>> > > > On Thu, Sep 7, 2017 at 9:44 PM, Apurva Mehta <apu...@confluent.io>
>> > > wrote:
>> > > >
>> > > > > Thanks for the comments Ismael.
>> > > > >
>> > > > > I have gone ahead and incorporated all your suggestions in the KIP
>> > > > > document. You convinced me on adding max.message.bytes :)
>> > > > >
>> > > > > Apurva
>> > > > >
>> > > > > On Thu, Sep 7, 2017 at 6:12 PM, Ismael Juma <ism...@juma.me.uk>
>> > wrote:
>> > > > >
>> > > > >> Thanks for the KIP. +1 (binding) from me. A few minor comments:
>> > > > >>
>> > > > >> 1. We should add a note to the backwards compatibility section
>> > > > explaining
>> > > > >> the impact of throwing DuplicateSequenceException (a new
>> exception)
>> > > from
>> > > > >> `send`. As I understand it, it's not an issue, but good to
>> include
>> > it
>> > > in
>> > > > >> the KIP.
>> > > > >>
>> > > > >> 2. For clarity, it's good to highlight in some way the new
>> fields in
>> > > the
>> > > > >> protocol definition itself
>> > > > >>
>> > > > >> 3. I understand that you decided not to add max.message.bytes
>> > because
>> > > > it's
>> > > > >> unrelated to this KIP. I'll try to persuade you that we should,
>> but
>> > > it's
>> > > > >> not a blocker if you don't agree. The reasons are: 1. The
>> > > implementation
>> > > > >> effort to add it is minimal since it's a topic config like
>> message
>> > > > format
>> > > > >> version, 2. It's clearly beneficial for the producer to have that
>> > > > >> information, 3. It's compact (just a number), 4. It's nice to
>> avoid
>> > > > >> another
>> > > > >> protocol bump for a small change like that.
>> > > > >>
>> > > > >> Thanks,
>> > > > >> Ismael
>> > > > >>
>> > > > >> On Thu, Sep 7, 2017 at 9:51 PM, Apurva Mehta <
>> apu...@confluent.io>
>> > > > wrote:
>> > > > >>
>> > > > >> > Hi,
>> > > > >> >
>> > > > >> > I'd like to start a vote for KIP-192:
>> > > > >> >
>> > > > >> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>> > > > >> > 192+%3A+Provide+cleaner+semantics+when+idempotence+is+enabled
>> > > > >> >
>> > > > >> > Thanks,
>> > > > >> > Apurva
>> > > > >> >
>> > > > >>
>> > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
>

Reply via email to