Hi Jason, I see what you mean. That makes sense. So in the above case after the producer resets PID, when it retry batch_0_tp1, the batch will still have the old PID even if the producer has already got a new PID.
@Jun, do you mean max(remaining delivery.timeout.ms, request.timeout.ms) instead of min(remaining delivery.timeout.ms, request.timeout.ms)? Thanks, Jiangjie (Becket) Qin On Fri, Aug 25, 2017 at 9:34 AM, Jun Rao <j...@confluent.io> wrote: > Hi, Becket, > > Good point on expiring inflight requests. Perhaps we can expire an inflight > request after min(remaining delivery.timeout.ms, request.timeout.ms). This > way, if a user sets a high delivery.timeout.ms, we can still recover from > broker power outage sooner. > > Thanks, > > Jun > > On Thu, Aug 24, 2017 at 12:52 PM, Becket Qin <becket....@gmail.com> wrote: > > > Hi Jason, > > > > delivery.timeout.ms sounds good to me. > > > > I was referring to the case that we are resetting the PID/sequence after > > expire a batch. This is more about the sending the batches after the > > expired batch. > > > > The scenario being discussed is expiring one of the batches in a > in-flight > > request and retry the other batches in the that in-flight request. So > > consider the following case: > > 1. Producer sends request_0 with two batches (batch_0_tp0 and > batch_0_tp1). > > 2. Broker receives the request enqueued the request to the log. > > 3. Before the producer receives the response from the broker, batch_0_tp0 > > expires. The producer will expire batch_0_tp0 immediately, resets PID, > and > > then resend batch_0_tp1, and maybe send batch_1_tp0 (i.e. the next batch > to > > the expired batch) as well. > > > > For batch_0_tp1, it is OK to reuse PID and and sequence number. The > problem > > is for batch_1_tp0, If we reuse the same PID and the broker has already > > appended batch_0_tp0, the broker will think batch_1_tp0 is a duplicate > with > > the same sequence number. As a result broker will drop batch_0_tp1. That > is > > why we have to either bump up sequence number or reset PID. To avoid this > > complexity, I was suggesting not expire the in-flight batch immediately, > > but wait for the produce response. If the batch has been successfully > > appended, we do not expire it. Otherwise, we expire it. > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Thu, Aug 24, 2017 at 11:26 AM, Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > @Becket > > > > > > Good point about unnecessarily resetting the PID in cases where we know > > the > > > request has failed. Might be worth opening a JIRA to try and improve > > this. > > > > > > So if we expire the batch prematurely and resend all > > > > the other batches in the same request, chances are there will be > > > > duplicates. If we wait for the response instead, it is less likely to > > > > introduce duplicates, and we may not need to reset the PID. > > > > > > > > > Not sure I follow this. Are you assuming that we change the batch > > > PID/sequence of the retried batches after resetting the PID? I think we > > > probably need to ensure that when we retry a batch, we always use the > > same > > > PID/sequence. > > > > > > By the way, as far as naming, `max.message.delivery.wait.ms` is quite > a > > > mouthful. Could we shorten it? Perhaps `delivery.timeout.ms`? > > > > > > -Jason > > > > > > On Wed, Aug 23, 2017 at 8:51 PM, Becket Qin <becket....@gmail.com> > > wrote: > > > > > > > Hi Jun, > > > > > > > > If TCP timeout is longer than request.timeout.ms, the producer will > > > always > > > > hit request.timeout.ms before hitting TCP timeout, right? That is > why > > we > > > > added request.timeout.ms in the first place. > > > > > > > > You are right. Currently we are reset the PID and resend the batches > to > > > > avoid OutOfOrderSequenceException when the expired batches are in > > retry. > > > > > > > > This does not distinguish the reasons that caused the retry. There > are > > > two > > > > cases: > > > > 1. If the batch was in retry because it received an error response > > (e.g. > > > > NotLeaderForPartition), we actually don't need to reset PID in this > > case > > > > because we know that broker did not accept it. > > > > 2. If the batch was in retry because it hit a timeout earlier, then > we > > > > should reset the PID (or optimistically send and only reset PID when > > > > receive OutOfOrderSequenceException?) > > > > Case 1 is probably the most common case, so it looks that we are > > > resetting > > > > the PID more often than necessary. But because in case 1 the broker > > does > > > > not have the batch, there isn't much impact on resting PID and resend > > > other > > > > than the additional round trip. > > > > > > > > Now we are introducing another case: > > > > 3. A batch is in retry because we expired an in-flight request before > > it > > > > hits request.timeout.ms. > > > > > > > > The difference between 2 and 3 is that in case 3 likely the broker > has > > > > appended the messages. So if we expire the batch prematurely and > resend > > > all > > > > the other batches in the same request, chances are there will be > > > > duplicates. If we wait for the response instead, it is less likely to > > > > introduce duplicates, and we may not need to reset the PID. > > > > > > > > That said, given that batch expiration is probably already rare > enough, > > > so > > > > it may not be necessary to optimize for that. > > > > > > > > Thanks, > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > On Wed, Aug 23, 2017 at 5:01 PM, Jun Rao <j...@confluent.io> wrote: > > > > > > > > > Hi, Becket, > > > > > > > > > > If a message expires while it's in an inflight produce request, the > > > > > producer will get a new PID if idempotent is enabled. This will > > prevent > > > > > subsequent messages from hitting OutOfOrderSequenceException. The > > issue > > > > of > > > > > not expiring an inflight request is that if a broker server goes > down > > > > hard > > > > > (e.g. power outage), the time that it takes for the client to > detect > > > the > > > > > socket level error (this will be sth like 8+ minutes with the > default > > > TCP > > > > > setting) is much longer than the default request.timeout.ms. > > > > > > > > > > Hi, Sumant, > > > > > > > > > > We can probably just default max.message.delivery.wait.ms to 30 > > secs, > > > > the > > > > > current default for request.timeout.ms. > > > > > > > > > > Thanks, > > > > > > > > > > Jun > > > > > > > > > > > > > > > On Wed, Aug 23, 2017 at 3:38 PM, Sumant Tambe <suta...@gmail.com> > > > wrote: > > > > > > > > > > > OK. Looks like starting the clock after closing the batch has > > quite a > > > > few > > > > > > pitfalls. I can't think of a way of to work around it without > > adding > > > > yet > > > > > > another config. So I won't discuss that here. Anyone? As I said > > > > earlier, > > > > > > I'm not hung up on super-accurate notification times. > > > > > > > > > > > > If we are going down the max.message.delievery.wait.ms route, > what > > > > would > > > > > > be > > > > > > the default? There seem to be a few options. > > > > > > > > > > > > 1. max.message.delievery.wait.ms=null. Nothing changes for those > > who > > > > > don't > > > > > > set it. I.e., batches expire after request.timeout.ms in > > > accumulator. > > > > If > > > > > > they are past the accumulator stage, timeout after retries*( > > > > > > request.timeout.ms+backoff). > > > > > > > > > > > > 2. max.message.delivery.wait.ms=request.timeout.ms. No obervable > > > > > > behavioral > > > > > > change at the accumulator level as timeout value is same as > before. > > > > > Retries > > > > > > will be done if as long as batch is under > > > max.message.delivery.wait.ms > > > > . > > > > > > However, a batch can expire just after one try. That's ok IMO > > because > > > > > > request.timeout.ms tend to be large (Default 30000). > > > > > > > > > > > > 3. max.message.delivery.wait.ms=2*request.timeout.ms. Give > > > opportunity > > > > > for > > > > > > two retries but warn that retries may not happen at all in some > > rare > > > > > > cases and a batch could expire before any attempt. > > > > > > > > > > > > 4. max.message.delivery.wait.ms=something else (a constant?) > > > > > > > > > > > > Thoughts? > > > > > > > > > > > > On 23 August 2017 at 09:01, Ismael Juma <ism...@juma.me.uk> > wrote: > > > > > > > > > > > > > Thanks Becket, that seems reasonable. Sumant, would you be > > willing > > > to > > > > > > > update the KIP based on the discussion or are you still not > > > > convinced? > > > > > > > > > > > > > > Ismael > > > > > > > > > > > > > > On Wed, Aug 23, 2017 at 6:04 AM, Becket Qin < > > becket....@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > In general max.message.delivery.wait.ms is a cleaner > approach. > > > > That > > > > > > > would > > > > > > > > make the guarantee clearer. That said, there seem subtleties > in > > > > some > > > > > > > > scenarios: > > > > > > > > > > > > > > > > 1. I agree with Sumante that it is a little weird that a > > message > > > > > could > > > > > > be > > > > > > > > expired immediately if it happens to enter a batch that is > > about > > > to > > > > > be > > > > > > > > expired. But as Jun said, as long as we have multiple > messages > > > in a > > > > > > > batch, > > > > > > > > there isn't a cheap way to achieve a precise timeout. So the > > > > question > > > > > > > > actually becomes whether it is more user-friendly to expire > > early > > > > > > (based > > > > > > > on > > > > > > > > the batch creation time) or expire late (based on the batch > > close > > > > > > time). > > > > > > > I > > > > > > > > think both are acceptable. Personally I think most users do > not > > > > > really > > > > > > > care > > > > > > > > about expire a little late as long as it eventually expires. > > So I > > > > > would > > > > > > > use > > > > > > > > batch close time as long as there is a bound on that. But it > > > looks > > > > > that > > > > > > > we > > > > > > > > do not really have a bound on when we will close a batch. So > > > > > expiration > > > > > > > > based on batch create time may be the only option if we don't > > > want > > > > to > > > > > > > > introduce complexity. > > > > > > > > > > > > > > > > 2. If we timeout a batch in a request when it is still in > > flight, > > > > the > > > > > > end > > > > > > > > result of that batch is unclear to the users. It would be > weird > > > > that > > > > > > user > > > > > > > > receive exception saying those messages are expired while > they > > > > > actually > > > > > > > > have been sent successfully. Also if idempotence is set to > > true, > > > > what > > > > > > > would > > > > > > > > the next sequence ID be after the expired batch? Reusing the > > same > > > > > > > sequence > > > > > > > > Id may result in data loss, and increment the sequence ID may > > > cause > > > > > > > > OutOfOrderSequenceException. Besides, extracting an expired > > batch > > > > > from > > > > > > a > > > > > > > > request also introduces some complexity. Again, personally I > > > think > > > > it > > > > > > is > > > > > > > > fine to expire a little bit late. So maybe we don't need to > > > expire > > > > a > > > > > > > batch > > > > > > > > that is already in flight. In the worst case we will expire > it > > > with > > > > > > delay > > > > > > > > of request.timeout.ms. > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > > Jiangjie (Becket) Qin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Aug 22, 2017 at 3:08 AM, Ismael Juma < > > ism...@juma.me.uk> > > > > > > wrote: > > > > > > > > > > > > > > > >> Hi all, > > > > > > > >> > > > > > > > >> The discussion has been going on for a while, would it help > to > > > > have > > > > > a > > > > > > > >> call to discuss this? I'd like to start a vote soonish so > that > > > we > > > > > can > > > > > > > >> include this in 1.0.0. I personally prefer > > > > > > max.message.delivery.wait.ms > > > > > > > . > > > > > > > >> It seems like Jun, Apurva and Jason also prefer that. > Sumant, > > it > > > > > seems > > > > > > > like > > > > > > > >> you still prefer a batch.expiry.ms, is that right? What are > > > your > > > > > > > >> thoughts Joel and Becket? > > > > > > > >> > > > > > > > >> Ismael > > > > > > > >> > > > > > > > >> On Wed, Aug 16, 2017 at 6:34 PM, Jun Rao <j...@confluent.io> > > > > wrote: > > > > > > > >> > > > > > > > >>> Hi, Sumant, > > > > > > > >>> > > > > > > > >>> The semantics of linger.ms is a bit subtle. The reasoning > > for > > > > the > > > > > > > >>> current > > > > > > > >>> implementation is the following. Let's say one sets > > linger.ms > > > > to 0 > > > > > > > (our > > > > > > > >>> current default value). Creating a batch for every message > > will > > > > be > > > > > > bad > > > > > > > >>> for > > > > > > > >>> throughput. Instead, the current implementation only forms > a > > > > batch > > > > > > when > > > > > > > >>> the > > > > > > > >>> batch is sendable (i.e., broker is available, inflight > > request > > > > > limit > > > > > > is > > > > > > > >>> not > > > > > > > >>> exceeded, etc). That way, the producer has more chance for > > > > > batching. > > > > > > > The > > > > > > > >>> implication is that a batch could be closed longer than > > > > linger.ms. > > > > > > > >>> > > > > > > > >>> Now, on your concern about not having a precise way to > > control > > > > > delay > > > > > > in > > > > > > > >>> the > > > > > > > >>> accumulator. It seems the batch.expiry.ms approach will > have > > > the > > > > > > same > > > > > > > >>> issue. If you start the clock when a batch is initialized, > > you > > > > may > > > > > > > expire > > > > > > > >>> some messages in the same batch early than batch.expiry.ms > . > > If > > > > you > > > > > > > start > > > > > > > >>> the clock when the batch is closed, the expiration time > could > > > be > > > > > > > >>> unbounded > > > > > > > >>> because of the linger.ms implementation described above. > > > > Starting > > > > > > the > > > > > > > >>> expiration clock on batch initialization will at least > > > guarantee > > > > > the > > > > > > > time > > > > > > > >>> to expire the first message is precise, which is probably > > good > > > > > > enough. > > > > > > > >>> > > > > > > > >>> Thanks, > > > > > > > >>> > > > > > > > >>> Jun > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> On Tue, Aug 15, 2017 at 3:46 PM, Sumant Tambe < > > > suta...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > >>> > > > > > > > >>> > Question about "the closing of a batch can be delayed > > longer > > > > than > > > > > > > >>> > linger.ms": > > > > > > > >>> > Is it possible to cause an indefinite delay? At some > point > > > > bytes > > > > > > > limit > > > > > > > >>> > might kick in. Also, why is closing of a batch coupled > with > > > > > > > >>> availability of > > > > > > > >>> > its destination? In this approach a batch chosen for > > eviction > > > > due > > > > > > to > > > > > > > >>> delay > > > > > > > >>> > needs to "close" anyway, right (without regards to > > > destination > > > > > > > >>> > availability)? > > > > > > > >>> > > > > > > > > >>> > I'm not too worried about notifying at super-exact time > > > > specified > > > > > > in > > > > > > > >>> the > > > > > > > >>> > configs. But expiring before the full wait-span has > elapsed > > > > > sounds > > > > > > a > > > > > > > >>> little > > > > > > > >>> > weird. So expiration time has a +/- spread. It works more > > > like > > > > a > > > > > > hint > > > > > > > >>> than > > > > > > > >>> > max. So why not message.delivery.wait.hint.ms? > > > > > > > >>> > > > > > > > > >>> > Yeah, cancellable future will be similar in complexity. > > > > > > > >>> > > > > > > > > >>> > I'm unsure if max.message.delivery.wait.ms will the > final > > > nail > > > > > for > > > > > > > >>> > producer > > > > > > > >>> > timeouts. We still won't have a precise way to control > > delay > > > in > > > > > > just > > > > > > > >>> the > > > > > > > >>> > accumulator segment. batch.expiry.ms does not try to > > > abstract. > > > > > > It's > > > > > > > >>> very > > > > > > > >>> > specific. > > > > > > > >>> > > > > > > > > >>> > My biggest concern at the moment is implementation > > > complexity. > > > > > > > >>> > > > > > > > > >>> > At this state, I would like to encourage other > independent > > > > > > opinions. > > > > > > > >>> > > > > > > > > >>> > Regards, > > > > > > > >>> > Sumant > > > > > > > >>> > > > > > > > > >>> > On 11 August 2017 at 17:35, Jun Rao <j...@confluent.io> > > > wrote: > > > > > > > >>> > > > > > > > > >>> > > Hi, Sumant, > > > > > > > >>> > > > > > > > > > >>> > > 1. Yes, it's probably reasonable to require > > > > > > > >>> max.message.delivery.wait.ms > > > > > > > >>> > > > > > > > > > >>> > > linger.ms. As for retries, perhaps we can set the > > default > > > > > > retries > > > > > > > to > > > > > > > >>> > > infinite or just ignore it. Then the latency will be > > > bounded > > > > by > > > > > > > >>> > > max.message.delivery.wait.ms. request.timeout.ms is > the > > > max > > > > > time > > > > > > > the > > > > > > > >>> > > request will be spending on the server. The client can > > > expire > > > > > an > > > > > > > >>> inflight > > > > > > > >>> > > request early if needed. > > > > > > > >>> > > > > > > > > > >>> > > 2. Well, since max.message.delivery.wait.ms specifies > > the > > > > max, > > > > > > > >>> calling > > > > > > > >>> > the > > > > > > > >>> > > callback a bit early may be ok? Note that > > > > > > > >>> max.message.delivery.wait.ms > > > > > > > >>> > > only > > > > > > > >>> > > comes into play in the rare error case. So, I am not > sure > > > if > > > > we > > > > > > > need > > > > > > > >>> to > > > > > > > >>> > be > > > > > > > >>> > > very precise. The issue with starting the clock on > > closing > > > a > > > > > > batch > > > > > > > is > > > > > > > >>> > that > > > > > > > >>> > > currently if the leader is not available, the closing > of > > a > > > > > batch > > > > > > > can > > > > > > > >>> be > > > > > > > >>> > > delayed longer than linger.ms. > > > > > > > >>> > > > > > > > > > >>> > > 4. As you said, future.get(timeout) itself doesn't > solve > > > the > > > > > > > problem > > > > > > > >>> > since > > > > > > > >>> > > you still need a way to expire the record in the > sender. > > > The > > > > > > amount > > > > > > > >>> of > > > > > > > >>> > work > > > > > > > >>> > > to implement a cancellable future is probably the same? > > > > > > > >>> > > > > > > > > > >>> > > Overall, my concern with patch work is that we have > > > iterated > > > > on > > > > > > the > > > > > > > >>> > produce > > > > > > > >>> > > request timeout multiple times and new issues keep > coming > > > > back. > > > > > > > >>> Ideally, > > > > > > > >>> > > this time, we want to have a solution that covers all > > > cases, > > > > > even > > > > > > > >>> though > > > > > > > >>> > > that requires a bit more work. > > > > > > > >>> > > > > > > > > > >>> > > Thanks, > > > > > > > >>> > > > > > > > > > >>> > > Jun > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > > >>> > > On Fri, Aug 11, 2017 at 12:30 PM, Sumant Tambe < > > > > > > suta...@gmail.com> > > > > > > > >>> > wrote: > > > > > > > >>> > > > > > > > > > >>> > > > Hi Jun, > > > > > > > >>> > > > > > > > > > > >>> > > > Thanks for looking into it. > > > > > > > >>> > > > > > > > > > > >>> > > > Yes, we did consider this message-level timeout > > approach > > > > and > > > > > > > >>> expiring > > > > > > > >>> > > > batches selectively in a request but rejected it due > to > > > the > > > > > > > >>> reasons of > > > > > > > >>> > > > added complexity without a strong benefit to > > > counter-weigh > > > > > > that. > > > > > > > >>> Your > > > > > > > >>> > > > proposal is a slight variation so I'll mention some > > > issues > > > > > > here. > > > > > > > >>> > > > > > > > > > > >>> > > > 1. It sounds like max.message.delivery.wait.ms will > > > > overlap > > > > > > with > > > > > > > >>> "time > > > > > > > >>> > > > segments" of both linger.ms and retries * ( > > > > > request.timeout.ms > > > > > > + > > > > > > > >>> > > > retry.backoff.ms). In that case, which config set > > takes > > > > > > > >>> precedence? It > > > > > > > >>> > > > would not make sense to configure configs from both > > sets. > > > > > > > >>> Especially, > > > > > > > >>> > we > > > > > > > >>> > > > discussed exhaustively internally that retries and > > > > > > > >>> > > > max.message.delivery.wait.ms can't / shouldn't be > > > > configured > > > > > > > >>> together. > > > > > > > >>> > > > Retires become moot as you already mention. I think > > > that's > > > > > > going > > > > > > > >>> to be > > > > > > > >>> > > > surprising to anyone wanting to use > > > > > > max.message.delivery.wait.ms > > > > > > > . > > > > > > > >>> We > > > > > > > >>> > > > probably need max.message.delivery.wait.ms > > linger.ms > > > or > > > > > > > >>> something > > > > > > > >>> > like > > > > > > > >>> > > > that. > > > > > > > >>> > > > > > > > > > > >>> > > > 2. If clock starts when a batch is created and expire > > > when > > > > > > > >>> > > > max.message.delivery.wait.ms is over in the > > accumulator, > > > > the > > > > > > > last > > > > > > > >>> few > > > > > > > >>> > > > messages in the expiring batch may not have lived > long > > > > > enough. > > > > > > As > > > > > > > >>> the > > > > > > > >>> > > > config seems to suggests per-message timeout, it's > > > > incorrect > > > > > to > > > > > > > >>> expire > > > > > > > >>> > > > messages prematurely. On the other hand if clock > starts > > > > after > > > > > > > >>> batch is > > > > > > > >>> > > > closed (which also implies that linger.ms is not > > covered > > > > by > > > > > > the > > > > > > > >>> > > > max.message.delivery.wait.ms config), no message > would > > > be > > > > be > > > > > > > >>> expired > > > > > > > >>> > too > > > > > > > >>> > > > soon. Yeah, expiration may be little bit too late but > > > hey, > > > > > this > > > > > > > >>> ain't > > > > > > > >>> > > > real-time service. > > > > > > > >>> > > > > > > > > > > >>> > > > 3. I agree that steps #3, #4, (and #5) are complex to > > > > > > implement. > > > > > > > >>> On the > > > > > > > >>> > > > other hand, batch.expiry.ms is next to trivial to > > > > implement. > > > > > > We > > > > > > > >>> just > > > > > > > >>> > > pass > > > > > > > >>> > > > the config all the way down to > > ProducerBatch.maybeExpire > > > > and > > > > > be > > > > > > > >>> done > > > > > > > >>> > with > > > > > > > >>> > > > it. > > > > > > > >>> > > > > > > > > > > >>> > > > 4. Do you think the effect of > > > max.message.delivery.wait.ms > > > > > can > > > > > > > be > > > > > > > >>> > > > simulated > > > > > > > >>> > > > with future.get(timeout) method? Copying excerpt from > > the > > > > > > kip-91: > > > > > > > >>> An > > > > > > > >>> > > > end-to-end timeout may be partially emulated using > the > > > > > > > >>> > > future.get(timeout). > > > > > > > >>> > > > The timeout must be greater than (batch.expiry.ms + > > > > nRetries > > > > > > * ( > > > > > > > >>> > > > request.timeout.ms + retry.backoff.ms)). Note that > > when > > > > > future > > > > > > > >>> times > > > > > > > >>> > > out, > > > > > > > >>> > > > Sender may continue to send the records in the > > > background. > > > > To > > > > > > > avoid > > > > > > > >>> > that, > > > > > > > >>> > > > implementing a cancellable future is a possibility. > > > > > > > >>> > > > > > > > > > > >>> > > > For simplicity, we could just implement a trivial > > method > > > in > > > > > > > >>> producer > > > > > > > >>> > > > ProducerConfigs.maxMessageDeliveryWaitMs() and > return > > a > > > > > number > > > > > > > >>> based > > > > > > > >>> > on > > > > > > > >>> > > > this formula? Users of future.get can use this > timeout > > > > value. > > > > > > > >>> > > > > > > > > > > >>> > > > Thoughts? > > > > > > > >>> > > > > > > > > > > >>> > > > Regards, > > > > > > > >>> > > > Sumant > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > >>> > > > On 11 August 2017 at 07:50, Sumant Tambe < > > > > suta...@gmail.com> > > > > > > > >>> wrote: > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > > Thanks for the KIP. Nice documentation on all > current > > > > > issues > > > > > > > >>> with the > > > > > > > >>> > > > >> timeout. > > > > > > > >>> > > > > > > > > > > > >>> > > > > For the KIP writeup, all credit goes to Joel Koshy. > > > > > > > >>> > > > > > > > > > > > >>> > > > > I'll follow up on your comments a little later. > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> You also brought up a good use case for timing > out a > > > > > > message. > > > > > > > >>> For > > > > > > > >>> > > > >> applications that collect and send sensor data to > > > Kafka, > > > > > if > > > > > > > the > > > > > > > >>> data > > > > > > > >>> > > > can't > > > > > > > >>> > > > >> be sent to Kafka for some reason, the application > > may > > > > > prefer > > > > > > > to > > > > > > > >>> > buffer > > > > > > > >>> > > > the > > > > > > > >>> > > > >> more recent data in the accumulator. Without a > > > timeout, > > > > > the > > > > > > > >>> > > accumulator > > > > > > > >>> > > > >> will be filled with old records and new records > > can't > > > be > > > > > > > added. > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> Your proposal makes sense for a developer who is > > > > familiar > > > > > > with > > > > > > > >>> how > > > > > > > >>> > the > > > > > > > >>> > > > >> producer works. I am not sure if this is very > > > intuitive > > > > to > > > > > > the > > > > > > > >>> users > > > > > > > >>> > > > since > > > > > > > >>> > > > >> it may not be very easy for them to figure out how > > to > > > > > > > configure > > > > > > > >>> the > > > > > > > >>> > > new > > > > > > > >>> > > > >> knob to bound the amount of the time when a > message > > is > > > > > > > >>> completed. > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> From users' perspective, Apurva's suggestion of > > > > > > > >>> > > > >> max.message.delivery.wait.ms (which > > > > > > > >>> > > > >> bounds the time when a message is in the > accumulator > > > to > > > > > the > > > > > > > time > > > > > > > >>> > when > > > > > > > >>> > > > the > > > > > > > >>> > > > >> callback is called) seems more intuition. You > listed > > > > this > > > > > in > > > > > > > the > > > > > > > >>> > > > rejected > > > > > > > >>> > > > >> section since it requires additional logic to > > rebatch > > > > > when a > > > > > > > >>> produce > > > > > > > >>> > > > >> request expires. However, this may not be too bad. > > The > > > > > > > >>> following are > > > > > > > >>> > > the > > > > > > > >>> > > > >> things that we have to do. > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> 1. The clock starts when a batch is created. > > > > > > > >>> > > > >> 2. If the batch can't be drained within > > > > > > > >>> > max.message.delivery.wait.ms, > > > > > > > >>> > > > all > > > > > > > >>> > > > >> messages in the batch will fail and the callback > > will > > > be > > > > > > > called. > > > > > > > >>> > > > >> 3. When sending a produce request, we calculate an > > > > > > expireTime > > > > > > > >>> for > > > > > > > >>> > the > > > > > > > >>> > > > >> request that equals to the remaining expiration > time > > > for > > > > > the > > > > > > > >>> oldest > > > > > > > >>> > > > batch > > > > > > > >>> > > > >> in the request. > > > > > > > >>> > > > >> 4. We set the minimum of the expireTime of all > > > inflight > > > > > > > >>> requests as > > > > > > > >>> > > the > > > > > > > >>> > > > >> timeout in the selector poll call (so that the > > > selector > > > > > can > > > > > > > >>> wake up > > > > > > > >>> > > > before > > > > > > > >>> > > > >> the expiration time). > > > > > > > >>> > > > >> 5. If the produce response can't be received > within > > > > > > > expireTime, > > > > > > > >>> we > > > > > > > >>> > > > expire > > > > > > > >>> > > > >> all batches in the produce request whose > expiration > > > time > > > > > has > > > > > > > >>> been > > > > > > > >>> > > > reached. > > > > > > > >>> > > > >> For the rest of the batches, we resend them in a > new > > > > > produce > > > > > > > >>> > request. > > > > > > > >>> > > > >> 6. If the producer response has a retriable error, > > we > > > > just > > > > > > > >>> backoff a > > > > > > > >>> > > bit > > > > > > > >>> > > > >> and then retry the produce request as today. The > > > number > > > > of > > > > > > > >>> retries > > > > > > > >>> > > > doesn't > > > > > > > >>> > > > >> really matter now. We just keep retrying until the > > > > > > expiration > > > > > > > >>> time > > > > > > > >>> > is > > > > > > > >>> > > > >> reached. It's possible that a produce request is > > never > > > > > > retried > > > > > > > >>> due > > > > > > > >>> > to > > > > > > > >>> > > > >> expiration. However, this seems the right thing to > > do > > > > > since > > > > > > > the > > > > > > > >>> > users > > > > > > > >>> > > > want > > > > > > > >>> > > > >> to timeout the message at this time. > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> Implementation wise, there will be a bit more > > > complexity > > > > > in > > > > > > > >>> step 3 > > > > > > > >>> > and > > > > > > > >>> > > > 4, > > > > > > > >>> > > > >> but probably not too bad. The benefit is that this > > is > > > > more > > > > > > > >>> intuitive > > > > > > > >>> > > to > > > > > > > >>> > > > >> the > > > > > > > >>> > > > >> end user. > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> Does that sound reasonable to you? > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> Thanks, > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> Jun > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> On Wed, Aug 9, 2017 at 10:03 PM, Sumant Tambe < > > > > > > > >>> suta...@gmail.com> > > > > > > > >>> > > > wrote: > > > > > > > >>> > > > >> > > > > > > > >>> > > > >> > On Wed, Aug 9, 2017 at 1:28 PM Apurva Mehta < > > > > > > > >>> apu...@confluent.io> > > > > > > > >>> > > > >> wrote: > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > There seems to be no relationship with > > cluster > > > > > > > metadata > > > > > > > >>> > > > >> availability > > > > > > > >>> > > > >> > or > > > > > > > >>> > > > >> > > > > staleness. Expiry is just based on the > time > > > > since > > > > > > the > > > > > > > >>> batch > > > > > > > >>> > > has > > > > > > > >>> > > > >> been > > > > > > > >>> > > > >> > > > ready. > > > > > > > >>> > > > >> > > > > Please correct me if I am wrong. > > > > > > > >>> > > > >> > > > > > > > > > > > >>> > > > >> > > > > > > > > > > >>> > > > >> > > > I was not very specific about where we do > > > > > expiration. > > > > > > I > > > > > > > >>> > glossed > > > > > > > >>> > > > over > > > > > > > >>> > > > >> > some > > > > > > > >>> > > > >> > > > details because (again) we've other > mechanisms > > > to > > > > > > detect > > > > > > > >>> non > > > > > > > >>> > > > >> progress. > > > > > > > >>> > > > >> > > The > > > > > > > >>> > > > >> > > > condition (!muted.contains(tp) && > > > (isMetadataStale > > > > > || > > > > > > > >>> > > > >> > > > > cluster.leaderFor(tp) == null)) is used in > > > > > > > >>> > > > >> > > > RecordAccumualtor.expiredBatches: > > > > > > > >>> > > > >> > > > https://github.com/apache/ > > > > > > kafka/blob/trunk/clients/src/ > > > > > > > >>> > > > >> > > > main/java/org/apache/kafka/ > > > > > > clients/producer/internals/ > > > > > > > >>> > > > >> > > > RecordAccumulator.java#L443 > > > > > > > >>> > > > >> > > > > > > > > > > >>> > > > >> > > > > > > > > > > >>> > > > >> > > > Effectively, we expire in all the following > > > cases > > > > > > > >>> > > > >> > > > 1) producer is partitioned from the brokers. > > > When > > > > > > > >>> metadata age > > > > > > > >>> > > > grows > > > > > > > >>> > > > >> > > beyond > > > > > > > >>> > > > >> > > > 3x it's max value. It's safe to say that > we're > > > not > > > > > > > >>> talking to > > > > > > > >>> > > the > > > > > > > >>> > > > >> > brokers > > > > > > > >>> > > > >> > > > at all. Report. > > > > > > > >>> > > > >> > > > 2) fresh metadata && leader for a partition > is > > > not > > > > > > known > > > > > > > >>> && a > > > > > > > >>> > > > batch > > > > > > > >>> > > > >> is > > > > > > > >>> > > > >> > > > sitting there for longer than > > > request.timeout.ms. > > > > > > This > > > > > > > >>> is one > > > > > > > >>> > > > case > > > > > > > >>> > > > >> we > > > > > > > >>> > > > >> > > > would > > > > > > > >>> > > > >> > > > like to improve and use batch.expiry.ms > > because > > > > > > > >>> > > > request.timeout.ms > > > > > > > >>> > > > >> is > > > > > > > >>> > > > >> > > too > > > > > > > >>> > > > >> > > > small. > > > > > > > >>> > > > >> > > > 3) fresh metadata && leader for a partition > is > > > > known > > > > > > && > > > > > > > >>> batch > > > > > > > >>> > is > > > > > > > >>> > > > >> > sitting > > > > > > > >>> > > > >> > > > there for longer than batch.expiry.ms. This > > is > > > a > > > > > new > > > > > > > case > > > > > > > >>> > that > > > > > > > >>> > > is > > > > > > > >>> > > > >> > > > different > > > > > > > >>> > > > >> > > > from #2. This is the catch-up mode case. > > Things > > > > are > > > > > > > >>> moving too > > > > > > > >>> > > > >> slowly. > > > > > > > >>> > > > >> > > > Pipeline SLAs are broken. Report and > shutdown > > > kmm. > > > > > > > >>> > > > >> > > > > > > > > > > >>> > > > >> > > > The second and the third cases are useful > to a > > > > > > real-time > > > > > > > >>> app > > > > > > > >>> > > for a > > > > > > > >>> > > > >> > > > completely different reason. Report, forget > > > about > > > > > the > > > > > > > >>> batch, > > > > > > > >>> > and > > > > > > > >>> > > > >> just > > > > > > > >>> > > > >> > > move > > > > > > > >>> > > > >> > > > on (without shutting down). > > > > > > > >>> > > > >> > > > > > > > > > > >>> > > > >> > > > > > > > > > > >>> > > > >> > > If I understand correctly, you are talking > > about a > > > > > fork > > > > > > of > > > > > > > >>> > apache > > > > > > > >>> > > > >> kafka > > > > > > > >>> > > > >> > > which has these additional conditions? Because > > > that > > > > > > check > > > > > > > >>> > doesn't > > > > > > > >>> > > > >> exist > > > > > > > >>> > > > >> > on > > > > > > > >>> > > > >> > > trunk today. > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > Right. It is our internal release in LinkedIn. > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > Or are you proposing to change the behavior of > > > expiry > > > > to > > > > > > > >>> > > > >> > > account for stale metadata and partitioned > > > producers > > > > > as > > > > > > > >>> part of > > > > > > > >>> > > this > > > > > > > >>> > > > >> KIP? > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > No. It's our temporary solution in the absence > of > > > > > kip-91. > > > > > > > Note > > > > > > > >>> > that > > > > > > > >>> > > we > > > > > > > >>> > > > >> dont > > > > > > > >>> > > > >> > like increasing request.timeout.ms. Without our > > > extra > > > > > > > >>> conditions > > > > > > > >>> > > our > > > > > > > >>> > > > >> > batches expire too soon--a problem in kmm > catchup > > > > mode. > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > If we get batch.expiry.ms, we will configure it > > to > > > 20 > > > > > > mins. > > > > > > > >>> > > > maybeExpire > > > > > > > >>> > > > >> > will use the config instead of r.t.ms. The > extra > > > > > > conditions > > > > > > > >>> will > > > > > > > >>> > be > > > > > > > >>> > > > >> > unnecessary. All three cases shall be covered > via > > > the > > > > > > > >>> batch.expiry > > > > > > > >>> > > > >> timeout. > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > > >>> > > > >> > > > > > > > > >>> > > > >> > > > > > > > >>> > > > > > > > > > > > >>> > > > > > > > > > > >>> > > > > > > > > > >>> > > > > > > > > >>> > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >