Thanks for the check! Makes sense to me. On Wed, May 20, 2020 at 3:27 PM Guozhang Wang <wangg...@gmail.com> wrote:
> Just to clarify on the implementation details: today inside > Sender#runOnce(), we have the following: > > ``` > > if (maybeSendAndPollTransactionalRequest()) { > return; > } > > ``` > > > Which basically means that as long as we still have any in-flight txn > request; so we are not really blocking on the first sent request but on all > of the types. For add-partitions requests, that makes sense since once > unblocked, all requests would unblock at the same time anyways; but we do > not necessarily need to block on AddOffsetCommitsToTxnRequest and > TxnOffsetCommitRequest. > > So we can relax the above restriction as 1) the txn coordinator is known, > 2) the producer has a valid PID. 3) all partitions are registered in the > txn. > > > > Guozhang > > On Wed, May 20, 2020 at 3:13 PM Boyang Chen <reluctanthero...@gmail.com> > wrote: > > > Thanks Guozhang for the new proposal. I agree that we could deliver > > https://issues.apache.org/jira/browse/KAFKA-9878 > > <https://issues.apache.org/jira/browse/KAFKA-9878?src=confmacro> first > and > > measure the following metrics: > > > > 1. The total volume of AddPartitionToTxn requests > > 2. The time used in propagating the transaction state updates during > > transaction > > 3. The time used in transaction marker propagation > > > > If those metrics suggest that we are doing a pretty good job already, > then > > the improvement of delivering the entire KIP-609 is minimal. In the > > meantime, I updated 9878 with more details. Additionally, I realized that > > we should change the logic for AddPartitionToTxn call so that we could > > maintain a future queue and wait for all the delta change completions, > > instead of blocking on the first sent out one. Does that make sense? > > > > On Wed, May 20, 2020 at 2:28 PM Guozhang Wang <wangg...@gmail.com> > wrote: > > > > > Hello Matthias, > > > > > > I have a quick question regarding the motivation of the long-blocking > and > > > batch-add-partitions behavior: do we think the latency primarily comes > > from > > > the network round-trips, or from the coordinator-side transaction-log > > > appends? If we believe it is coming from the latter, then perhaps we > can > > > first consider optimizing that without making any public changes, > > > specifically: > > > > > > 1) We block on the add-partitions in a purgatory, as proposed in your > > KIP. > > > > > > 2) When try-completing the parked add-partitions requests in a > purgatory, > > > we consolidate them as a single txn state transition with a single > append > > > to transaction log. > > > > > > 3) *Optionally* on the client side, we can further optimize the > behavior: > > > instead of block on sending any batches as long as there are any txn > > > requests in flight, we would just query which partitions has > successfully > > > "registered" as part of the txn from add-partitions response and then > > send > > > records for those partitions. By doing this we can reduce the > end-to-end > > > blocking time. > > > > > > None of the above changes actually requires any public API or protocol > > > changes. In addition, it would not make things worse even in edge cases > > > whereas with the proposed API change, if the producer pre-registered a > > > bunch of partitions but then timed out, the coordinator need to write > > abort > > > markers to those pre-registered partitions unnecessarily. We can > measure > > > the avg. number of txn log appends per transaction on the broker side > and > > > see if it can be reduced to close to 1 already. > > > > > > > > > Guozhang > > > > > > > > > On Tue, May 19, 2020 at 10:33 AM Boyang Chen < > reluctanthero...@gmail.com > > > > > > wrote: > > > > > > > Hey John, > > > > > > > > thanks for the insights! Replied inline. > > > > > > > > On Tue, May 19, 2020 at 7:48 AM John Roesler <vvcep...@apache.org> > > > wrote: > > > > > > > > > Thanks for the KIP, Boyang! > > > > > > > > > > This looks good and reasonable to me overall. > > > > > > > > > > J1: One clarification: you mention that the blocking behavior > depends > > > on > > > > > a new broker version, which sounds good to me, but I didn't see why > > > > > we would need to throw any UnsupportedVersionExceptions. It sounds > > > > > a little like you just want to implement a kind of long polling on > > the > > > > > AddPartitionToTxn API, such that the broker would optimistically > > block > > > > > for a while when there is a pending prior transaction. > > > > > > > > > > Can this just be a behavior change on the broker side, such that > both > > > > > old and new clients would be asked to retry when the broker is > older, > > > > > and both old and new clients would instead see the API call block > for > > > > > longer (but be successful more often) when the broker is newer? > > > > > > > > > > Related: is it still possible to get back the "please retry" error > > from > > > > the > > > > > broker, or is it guaranteed to block until the call completes? > > > > > > > > > > This is a good observation. I agree the blocking behavior could > > benefit > > > > all the producer > > > > versions older than 0.11, which could be retried. Added to the KIP. > > > > > > > > > > > > > J2: Please forgive my ignorance, but is there any ill effect if a > > > > producer > > > > > adds a partition to a transaction and then commits without having > > added > > > > > any data to the transaction? > > > > > > > > > > I can see this happening, e.g., if I know that my application > > generally > > > > > sends to 5 TopicPartitions, I would use the new beginTransaction > call > > > > > and just always give it the same list of partitions, and _then_ do > > the > > > > > processing, which may or may not send data to all five potential > > > > > partitions. > > > > > > > > > > > > > Yes, that's possible, which is the reason why we discussed bumping > the > > > > EndTxn request > > > > to only include the partitions actually being written to, so that the > > > > transaction coordinator will only send markers > > > > to the actually-written partitions. The worst case for mis-used > > > > pre-registration API > > > > is to write out more transaction markers than necessary. For once, I > do > > > see > > > > the benefit of doing that, > > > > which is a life-saver for a "lazy user" who doesn't want to infer the > > > > output partitions it would write to, but always > > > > registers the full set of output partitions. With this observation in > > > mind, > > > > bumping EndTxn makes sense. > > > > > > > > > > > > > > Thanks again! > > > > > -John > > > > > > > > > > On Mon, May 18, 2020, at 10:25, Boyang Chen wrote: > > > > > > Oh, I see your point! Will add that context to the KIP. > > > > > > > > > > > > Boyang > > > > > > > > > > > > On Sun, May 17, 2020 at 11:39 AM Guozhang Wang < > wangg...@gmail.com > > > > > > > > wrote: > > > > > > > > > > > > > My point here is only for the first AddPartitionToTxn request > of > > > the > > > > > > > transaction, since only that request would potentially be > blocked > > > on > > > > > the > > > > > > > previous txn to complete. By deferring it we reduce the > blocking > > > > time. > > > > > > > > > > > > > > I think StreamsConfigs override the linger.ms to 100ms not > 10ms, > > > so > > > > > in the > > > > > > > best case we can defer the first AddPartitionToTxn of the > > > transaction > > > > > by > > > > > > > 100ms. > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > On Sat, May 16, 2020 at 12:20 PM Boyang Chen < > > > > > reluctanthero...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > Thanks Guozhang for the context. The producer batch is either > > > > > bounded by > > > > > > > > the size or the linger time. For the default 10ms linger and > > > 100ms > > > > > > > > transaction commit time, the producer will be capped by > > > > > > > > AddPartitionToTxn 10 times in the worst case. I think the > > > > improvement > > > > > > > here > > > > > > > > aims for the worst case scenario for users who didn't realize > > how > > > > the > > > > > > > > internal works, and uses the API calls in a very inefficient > > way > > > as > > > > > the > > > > > > > > scenario where record processing and send() happen > > concurrently. > > > > > > > > > > > > > > > > Boyang > > > > > > > > > > > > > > > > On Sat, May 16, 2020 at 10:19 AM Guozhang Wang < > > > wangg...@gmail.com > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > Hello Boyang, > > > > > > > > > > > > > > > > > > Thanks for the proposed KIP, overall it makes sense to me. > > > > > > > > > > > > > > > > > > One non-public API related point that I'd like to make > > though, > > > is > > > > > that > > > > > > > in > > > > > > > > > KafkaProducer.send call we can potentially defer sending > > > > > > > > > AddPartitionsToTxn request until the sender is about to > send > > > the > > > > > first > > > > > > > > > batch -- this is what I observed from some soak testing > > > > > investigation > > > > > > > > such > > > > > > > > > that the batching effects actually allows the first record > to > > > be > > > > > sent > > > > > > > > much > > > > > > > > > later than the send() call and that can be leveraged to > > further > > > > > reduce > > > > > > > > the > > > > > > > > > time that we would be blocked on the AddPartitionsToTxn > > > request. > > > > > > > > > > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > On Thu, May 14, 2020 at 10:26 PM Boyang Chen < > > > > > > > reluctanthero...@gmail.com > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hey all, > > > > > > > > > > > > > > > > > > > > I would like to start the discussion for KIP-609: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-609%3A+Use+Pre-registration+and+Blocking+Calls+for+Better+Transaction+Efficiency > > > > > > > > > > > > > > > > > > > > This KIP aims to improve the current EOS semantic which > > makes > > > > the > > > > > > > > > > processing more efficient and consolidated. > > > > > > > > > > > > > > > > > > > > Thanks! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > -- Guozhang > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > -- Guozhang > > > > > > > > -- > -- Guozhang >