Thanks for your email Becket. I would be interested in hearing others opinions on which should be a better default between acks=1 and acks=all.
One important point on which I disagree is your statement that 'users need to do a lot of work to get exactly-once with acks=all'. This is debatable. If we enable acks=all, and if we ship with sane topic-level configs (like disabling unclean leader election), then users will get produce exceptions with the default settings only for authorization and config exceptions, or exceptions due to correlated hard failures or software bugs (assuming replication-factor > 1, which is when acks=all and acks=1 differ). This should be sufficiently rare that expecting apps to shut down and have manual intervention to ensure data consistency is not unreasonable. So users will not have to have complicated code to ensure exactly-once in their app with my proposed defaults: just shut down the producer when a `send` returns an error and check manually if you really care about exactly-once. The latter should happen so rarely that I argue that it would be worth the cost. And if all else fails, there are still ways to recover automatically, but those are then very complex as you pointed out. Regarding max.in.flight: again, given the nature of the idempotence feature, we have to bound it. One trade off is that if you have this cross-dc use case with extremely high client/broker latency, you either accept lower performance with idempotence (and max.in.flight=5), or disable idempotence and keep max.in.flight at 20 or whatever. I think this is a fair tradeoff. Thanks, Apurva On Fri, Aug 11, 2017 at 11:45 AM, Becket Qin <becket....@gmail.com> wrote: > Hi Apurva, > > I agree that most changes we are talking about here are for default values > of the configurations and users can always override them. So I think the > question to ask is more about the out of the box experience. If the change > makes strict improvement compared with the current settings, that would > make a lot of sense. (e.g. idempotence + pipelined produce requests). On > the other hand, if the out of the box experience is not strictly improved, > but just default to address another scenario, we may need to think about > that a bit more (e.g. acks=all). > > The way I view this is the following: For the users who wants exactly once, > they need to do a lot of extra work even if we do all the right > configurations. That means for those users, they need to understand all the > failure cases and properly handle them. For those users, they probably > already understand (or at least needs to understand) how to configure the > cluster. So providing the default configurations for them do not provide > much additional benefit. For the other users, who care about low latency > and high throughput but not require the most strong semantic, shipping the > default settings to be the strong semantic at the cost of latency and > throughput will force them to look into the configurations and tune for > throughput and latency, which is something they don't need to in the > previous versions. Therefore, I feel it may not be necessary to ship Kafka > with the strongest guarantee. > > In terms of the max.in.flight.request. In some long latency pipeline, (e.g > cross ocean pipeline), the latency could be a couple of hundreds ms. > Assuming we have 10 Gbps bandwidth and 10 MB average produce request size. > When the latency is 200 ms, because each requests takes about 10 ms to > send, we need to have max.in.flight.requests ~ 20 in order to fully utilize > the network bandwidth. When the requests are smaller, we will need to > pipeline more requests. > > Thanks, > > Jiangjie (Becket) Qin > > > > > On Thu, Aug 10, 2017 at 10:43 PM, Apurva Mehta <apu...@confluent.io> > wrote: > > > Hi Dong, > > > > Thanks for your comments. > > > > Yes, with retries=MAX_INT, producer.flush() may block. I think there are > > two solutions: a good one would be to adopt some form of KIP-91 to bound > > the time a message can remain unacknowledged. Alternately, we could set > the > > default retries to 10 or something. I prefer implementing KIP-91 along > with > > this KIP to solve this problem, but it isn't a strong dependency. > > > > Yes, OutOfOrderSequence is a new exception. It indicates a previously > > acknowledged message was lost. This could happen even today, but there is > > no way for the client to detect it. With KIP-98 and the new sequence > > numbers, we can. If applications ignore it, they would have the same > > behavior as the already have, except with the explicit knowledge that > > something has been lost. > > > > Finally, from my perspective, the best the reason to make acks=all the > > default is that it would be a coherent default to have. Along with > enabling > > idempotence, acks=all, and retries=MAX_INT would mean that acknowledged > > messages would appear in the log exactly once. The 'fatal' exceptions > would > > be either AuthorizationExceptions, ConfigExceptions, or rare data loss > > issues due to concurrent failures or software bugs. So while this is not > a > > guarantee of exactly once, it is practically as close to it as you can > get. > > I think this is a strong enough reason to enable acks=all. > > > > Thanks, > > Apurva > > > > > > On Thu, Aug 10, 2017 at 1:04 AM, Dong Lin <lindon...@gmail.com> wrote: > > > > > Hey Apurva, > > > > > > Thanks for the KIP. I have read through the KIP and the prior > discussion > > in > > > this thread. I have three concerns that are related to Becket's > comments: > > > > > > - Is it true that, as Becket has mentioned, producer.flush() may block > > > infinitely if retries=MAX_INT? This seems like a possible reason to > break > > > user's application. I think we probably should avoid causing > correctness > > > penalty for application. > > > > > > - It seems that OutOfOrderSequenceException will be a new exception > > thrown > > > to user after this config change. Can you clarify whether this will > cause > > > correctness penalty for application? > > > > > > - It is not very clear to me whether the benefit of increasing acks > from > > 1 > > > to all is worth the performance hit. For users who have not already > > > overridden acks to all, it is very likely that they are not already > doing > > > other complicated work (e.g. close producer in callback) that are > > necessary > > > for exactly-once delivery. Thus those users won't have exactly-once > > > semantics by simply picking up the change in the default acks > > > configuration. It seems that the only benefit of this config change is > > the > > > well-known tradeoff between performance and message loss rate. I am not > > > sure this is a strong reason to risk reducing existing user's > > performance. > > > > > > I think my point is that we should not to make change that will break > > > user's existing application. And we should try to avoid reducing user's > > > performance unless there is strong benefit of doing so (e.g. > > exactly-once). > > > > > > Thanks, > > > Dong > > > > > > > > > > > > > > > On Wed, Aug 9, 2017 at 10:43 PM, Apurva Mehta <apu...@confluent.io> > > wrote: > > > > > > > Thanks for your email Becket. > > > > > > > > Your observations around using acks=1 and acks=-1 are correct. Do > note > > > that > > > > getting an OutOfOrderSequence means that acknowledged data has been > > lost. > > > > This could be due to a weaker acks setting like acks=1 or due to a > > topic > > > > which is not configured to handle broker failures cleanly (unclean > > leader > > > > election is enabled, etc.). Either way, you are right in observing > that > > > if > > > > an app is very serious about having exactly one copy of each ack'd > > > message > > > > in the log, it is a significant effort to recover from this error. > > > > > > > > However, I propose an alternate way of thinking about this: is it > > > > worthwhile shipping Kafka with the defaults tuned for strong > semantics? > > > > That is essentially what is being proposed here, and of course there > > will > > > > be tradeoffs with performance and deployment costs-- you can't have > > your > > > > cake and eat it too. > > > > > > > > And if we want to ship Kafka with strong semantics by default, we > might > > > > want to make the default topic level settings as well as the client > > > > settings more robust. This means, for instance, disabling unclean > > leader > > > > election by default. If there are other configs we need to change on > > the > > > > broker side to ensure that ack'd messages are not lost due to > transient > > > > failures, we should change those as well as part of a future KIP. > > > > > > > > Personally, I think that the defaults should provide robust > guarantees. > > > > > > > > And this brings me to another point: these are just proposed > defaults. > > > > Nothing is being taken away in terms of flexibility to tune for > > different > > > > behavior. > > > > > > > > Finally, the way idempotence is implemented means that there needs to > > be > > > > some cap on max.in.flight when idempotence is enabled -- that is > just a > > > > tradeoff of the feature. Do we have any data that there are > > installations > > > > which benefit greatly for a value of max.in.flight > 5? For instance, > > > > LinkedIn probably has the largest and most demanding deployment of > > Kafka. > > > > Are there any applications which use max.inflight > 5? That would be > > good > > > > data to have. > > > > > > > > Thanks, > > > > Apurva > > > > > > > > > > > > > > > > > > > > > > > > On Wed, Aug 9, 2017 at 2:59 PM, Becket Qin <becket....@gmail.com> > > wrote: > > > > > > > > > Thanks for the KIP, Apurva. It is a good time to review the > > > > configurations > > > > > to see if we can improve the user experience. We also might need to > > > think > > > > > from users standpoint about the out of the box experience. > > > > > > > > > > 01. Generally speaking, I think it makes sense to make > > idempotence=true > > > > so > > > > > we can enable producer side pipeline without ordering issue. > However, > > > the > > > > > impact is that users may occasionally receive > > > OutOfOrderSequencException. > > > > > In this case, there is not much user can do if they want to ensure > > > > > ordering. They basically have to close the producer in the call > back > > > and > > > > > resend all the records that are in the RecordAccumulator. This is > > very > > > > > involved. And the users may not have a way to retrieve the Records > in > > > the > > > > > accumulator anymore. So for the users who really want to achieve > the > > > > > exactly once semantic, there are actually still a lot of work to do > > > even > > > > > with those default. For the rest of the users, they need to handle > > one > > > > more > > > > > exception, which might not be a big deal. > > > > > > > > > > 02. Setting acks=-1 would significantly reduce the likelihood of > > > > > OutOfOrderSequenceException from happening. However, the > > > > latency/throughput > > > > > impact and additional purgatory burden on the broker are big > > concerns. > > > > And > > > > > it does not really guarantee exactly once without broker side > > > > > configurations. i.e unclean.leader.election, min.isr, etc. I am not > > > sure > > > > if > > > > > it is worth making acks=-1 a global config instead of letting the > > users > > > > who > > > > > are really care about this to configure correctly. > > > > > > > > > > 03. Regarding retries, I think we had some discussion in KIP-91. > The > > > > > problem of setting retries to max integer is that producer.flush() > > may > > > > take > > > > > forever. Will this KIP be depending on KIP-91? > > > > > > > > > > I am not sure about having a cap on the max.in.flight.requests. It > > > seems > > > > > that on some long RTT link, sending more requests in the pipeline > > would > > > > be > > > > > the only way to keep the latency to be close to RTT. > > > > > > > > > > Thanks, > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > On Wed, Aug 9, 2017 at 11:28 AM, Apurva Mehta <apu...@confluent.io > > > > > > wrote: > > > > > > > > > > > Thanks for the comments Ismael and Jason. > > > > > > > > > > > > Regarding the OutOfOrderSequenceException, it is more likely when > > you > > > > > > enable idempotence and have acks=1, simply because you have a > > greater > > > > > > probability of losing acknowledged data with acks=1, and the > error > > > code > > > > > > indicates that. > > > > > > > > > > > > The particular scenario is that a broker acknowledges a message > > with > > > > > > sequence N before replication happens, and then crashes. Since > the > > > > > message > > > > > > was acknowledged the producer increments its sequence to N+1. The > > new > > > > > > leader would not have received the message, and still expects > > > sequence > > > > N > > > > > > from the producer. When it receives N+1 for the next message, it > > will > > > > > > return an OutOfOrderSequenceNumber, correctl/y indicating some > > > > previously > > > > > > acknowledged messages are missing. > > > > > > > > > > > > For the idempotent producer alone, the > OutOfOrderSequenceException > > is > > > > > > returned in the Future and Callback, indicating to the > application > > > that > > > > > > some acknowledged data was lost. However, the application can > > > continue > > > > > > producing data using the producer instance. The only > compatibility > > > > issue > > > > > > here is that the application will now see a new exception for a > > state > > > > > which > > > > > > went previously undetected. > > > > > > > > > > > > For a transactional producer, an OutOfOrderSequenceException is > > fatal > > > > and > > > > > > the application must use a new instance of the producer. > > > > > > > > > > > > Another point about acks=1 with enable.idempotence=true. What > > > semantics > > > > > are > > > > > > we promising here? Essentially we are saying that the default > mode > > > > would > > > > > be > > > > > > 'if a message is in the log, it will occur only once, but all > > > > > acknowledged > > > > > > messages may not make it to the log'. I don't think that this is > a > > > > > > desirable default guarantee. > > > > > > > > > > > > I will update the KIP to indicate that with the new default, > > > > applications > > > > > > might get a new 'OutOfOrderSequenceException'. > > > > > > > > > > > > Thanks, > > > > > > Apurva > > > > > > > > > > > > On Wed, Aug 9, 2017 at 9:33 AM, Ismael Juma <ism...@juma.me.uk> > > > wrote: > > > > > > > > > > > > > Hi Jason, > > > > > > > > > > > > > > Thanks for the correction. See inline. > > > > > > > > > > > > > > On Wed, Aug 9, 2017 at 5:13 PM, Jason Gustafson < > > > ja...@confluent.io> > > > > > > > wrote: > > > > > > > > > > > > > > > Minor correction: the OutOfOrderSequenceException is not > fatal > > > for > > > > > the > > > > > > > > idempotent producer and it is not necessarily tied to the > acks > > > > > setting > > > > > > > > (though it is more likely to be thrown with acks=1). > > > > > > > > > > > > > > > > > > > > > Right, it would be worth expanding on the specifics of this. My > > > > > > > understanding is that common failure scenarios could trigger > it. > > > > > > > > > > > > > > > > > > > > > > It is used to signal > > > > > > > > the user that there was a gap in the delivery of messages. > You > > > can > > > > > hit > > > > > > > this > > > > > > > > if there is a pause on the producer and the topic retention > > kicks > > > > in > > > > > > and > > > > > > > > deletes the last records the producer had written. However, > it > > is > > > > > > > possible > > > > > > > > for the user to catch it and simply keep producing > (internally > > > the > > > > > > > producer > > > > > > > > will generate a new ProducerId). > > > > > > > > > > > > > > > > > > > > > I see, our documentation states that it's fatal in the > following > > > > > example > > > > > > > and in the `send` method. I had overlooked that this was > > mentioned > > > in > > > > > the > > > > > > > context of transactions. If we were to enable idempotence by > > > default, > > > > > > we'd > > > > > > > want to flesh out the docs for idempotence without > transactions. > > > > > > > > > > > > > > * try { > > > > > > > * producer.beginTransaction(); > > > > > > > * for (int i = 0; i < 100; i++) > > > > > > > * producer.send(new ProducerRecord<>("my-topic", > > > > > > > Integer.toString(i), Integer.toString(i))); > > > > > > > * producer.commitTransaction(); > > > > > > > * } catch (ProducerFencedException | > OutOfOrderSequenceException > > | > > > > > > > AuthorizationException e) { > > > > > > > * // We can't recover from these exceptions, so our only > > option > > > > is > > > > > > > to close the producer and exit. > > > > > > > * producer.close(); > > > > > > > * } catch (KafkaException e) { > > > > > > > * // For all other exceptions, just abort the transaction > and > > > try > > > > > > > again. > > > > > > > * producer.abortTransaction(); > > > > > > > * } > > > > > > > * producer.close(); > > > > > > > > > > > > > > Nevertheless, pre-idempotent-producer code > > > > > > > > won't be expecting this exception, and that may cause it to > > break > > > > in > > > > > > > cases > > > > > > > > where it previously wouldn't. This is probably the biggest > risk > > > of > > > > > the > > > > > > > > change. > > > > > > > > > > > > > > > > > > > > > > This is a good point and we should include it in the KIP. > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > > > > > > > > > > > > > > >