> > I think co-locating does have some merits here, i.e. letting the > ConsumerCoordinator which has the source-of-truth of assignment to act as > the TxnCoordinator as well; but I agree there's also some cons of coupling > them together. I'm still a bit inclining towards colocation but if there > are good rationales not to do so I can be convinced as well.
The good rationale is that we have no mechanism to colocate partitions ;). Are you suggesting we store the group and transaction state in the same log? Can you be more concrete about the benefit? -Jason On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang <wangg...@gmail.com> wrote: > Hi Boyang, > > 1. One advantage of retry against on-hold is that it will not tie-up a > handler thread (of course the latter could do the same but that involves > using a purgatory which is more complicated), and also it is less likely to > violate request timeout. So I think there are some rationales to prefer > retries. > > 2. Regarding "ConsumerRebalanceListener": both ConsumerRebalanceListener > and PartitionAssignors are user-customizable modules, and only difference > is that the former is specified via code and the latter is specified via > config. > > Regarding Jason's proposal of ConsumerAssignment, one thing to note though > with KIP-429 the onPartitionAssigned may not be called if the assignment > does not change, whereas onAssignment would always be called at the end of > sync-group response. My proposed semantics is that > `RebalanceListener#onPartitionsXXX` are used for notifications to user, and > hence if there's no changes these will not be called, whereas > `PartitionAssignor` is used for assignor logic, whose callback would always > be called no matter if the partitions have changed or not. > > 3. I feel it is a bit awkward to let the TxnCoordinator keeping partition > assignments since it is sort of taking over the job of the > ConsumerCoordinator, and may likely cause a split-brain problem as two > coordinators keep a copy of this assignment which may be different. > > I think co-locating does have some merits here, i.e. letting the > ConsumerCoordinator which has the source-of-truth of assignment to act as > the TxnCoordinator as well; but I agree there's also some cons of coupling > them together. I'm still a bit inclining towards colocation but if there > are good rationales not to do so I can be convinced as well. > > 4. I guess I'm preferring the philosophy of "only add configs if there's no > other ways", since more and more configs would make it less and less > intuitive out of the box to use. > > I think it's a valid point that checks upon starting up does not cope with > brokers downgrading but even with a config, but it is still hard for users > to determine when they can be ensured the broker would never downgrade > anymore and hence can safely switch the config. So my feeling is that this > config would not be helping too much still. If we want to be at the safer > side, then I'd suggest we modify the Coordinator -> NetworkClient hierarchy > to allow the NetworkClient being able to pass the APIVersion metadata to > Coordinator, so that Coordinator can rely on that logic to change its > behavior dynamically. > > 5. I do not have a concrete idea about how the impact on Connect would > make, maybe Randall or Konstantine can help here? > > > Guozhang > > On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen <reluctanthero...@gmail.com> > wrote: > > > Hey Jason, > > > > thank you for the proposal here. Some of my thoughts below. > > > > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson <ja...@confluent.io> > > wrote: > > > > > Hi Boyang, > > > > > > Thanks for picking this up! Still reading through the updates, but here > > are > > > a couple initial comments on the APIs: > > > > > > 1. The `TxnProducerIdentity` class is a bit awkward. I think we are > > trying > > > to encapsulate state from the current group assignment. Maybe something > > > like `ConsumerAssignment` would be clearer? If we make the usage > > consistent > > > across the consumer and producer, then we can avoid exposing internal > > state > > > like the generationId. > > > > > > For example: > > > > > > // Public API > > > interface ConsumerAssignment { > > > Set<TopicPartition> partittions(); > > > } > > > > > > // Not a public API > > > class InternalConsumerAssignment implements ConsumerAssignment { > > > Set<TopicPartition> partittions; > > > int generationId; > > > } > > > > > > Then we can change the rebalance listener to something like this: > > > onPartitionsAssigned(ConsumerAssignment assignment) > > > > > > And on the producer: > > > void initTransactions(String groupId, ConsumerAssignment assignment); > > > > > > 2. Another bit of awkwardness is the fact that we have to pass the > > groupId > > > through both initTransactions() and sendOffsetsToTransaction(). We > could > > > consider a config instead. Maybe something like ` > transactional.group.id > > `? > > > Then we could simplify the producer APIs, potentially even deprecating > > the > > > current sendOffsetsToTransaction. In fact, for this new usage, the ` > > > transational.id` config is not needed. It would be nice if we don't > have > > > to > > > provide it. > > > > > > I like the idea of combining 1 and 2. We could definitely pass in a > > group.id config > > so that we could avoid exposing that information in a public API. The > > question I have > > is that whether we should name the interface `GroupAssignment` instead, > so > > that Connect later > > could also extend on the same interface, just to echo Guozhang's point > > here, Also the base interface > > is better to be defined empty for easy extension, or define an abstract > > type called `Resource` to be shareable > > later IMHO. > > > > > > > By the way, I'm a bit confused about discussion above about colocating > > the > > > txn and group coordinators. That is not actually necessary, is it? > > > > > > Yes, this is not a requirement for this KIP, because it is inherently > > impossible to > > achieve co-locating topic partition of transaction log and consumed > offset > > topics. > > > > > > > Thanks, > > > Jason > > > > > On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen <reluctanthero...@gmail.com > > > > > wrote: > > > > > > > Thank you Ismael for the suggestion. We will attempt to address it by > > > > giving more details to rejected alternative section. > > > > > > > > > > > > Thank you for the comment Guozhang! Answers are inline below. > > > > > > > > > > > > > > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > > > Hello Boyang, > > > > > > > > > > Thanks for the KIP, I have some comments below: > > > > > > > > > > 1. "Once transactions are complete, the call will return." This > seems > > > > > different from the existing behavior, in which we would return a > > > > retriable > > > > > CONCURRENT_TRANSACTIONS and let the client to retry, is this > > > intentional? > > > > > > > > > > > > > I don’t think it is intentional, and I will defer this question to > > Jason > > > > when he got time to answer since from what I understood retry and on > > hold > > > > seem both valid approaches. > > > > > > > > > > > > > 2. "an overload to onPartitionsAssigned in the consumer's rebalance > > > > > listener interface": as part of KIP-341 we've already add this > > > > information > > > > > to the onAssignment callback. Would this be sufficient? Or more > > > generally > > > > > speaking, which information have to be passed around in rebalance > > > > callback > > > > > while others can be passed around in PartitionAssignor callback? In > > > > Streams > > > > > for example both callbacks are used but most critical information > is > > > > passed > > > > > via onAssignment. > > > > > > > > > > > > > We still need to extend ConsumerRebalanceListener because it’s the > > > > interface we could have public access to. The #onAssignment call is > > > defined > > > > on PartitionAssignor level which is not easy to work with external > > > > producers. > > > > > > > > > > > > > 3. "We propose to use a separate record type in order to store the > > > group > > > > > assignment.": hmm, I thought with the third typed FindCoordinator, > > the > > > > same > > > > > broker that act as the consumer coordinator would always be > selected > > > as > > > > > the txn coordinator, in which case it can access its local cache > > > > metadata / > > > > > offset topic to get this information already? We just need to think > > > about > > > > > how to make these two modules directly exchange information without > > > > messing > > > > > up the code hierarchy. > > > > > > > > > > > > > These two coordinators will be on the same broker only when number of > > > > partitions for transaction state topic and consumer offset topic are > > the > > > > same. This normally holds true, but I'm afraid > > > > we couldn't make this assumption? > > > > > > > > 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it seems the > goal > > of > > > > > this config is just to avoid old-versioned broker to not be able to > > > > > recognize newer versioned client. I think if we can do something > else > > > to > > > > > avoid this config though, for example we can use the embedded > > > AdminClient > > > > > to send the APIVersion request upon starting up, and based on the > > > > returned > > > > > value decides whether to go to the old code path or the new > behavior. > > > > > Admittedly asking a random broker about APIVersion does not > guarantee > > > the > > > > > whole cluster's versions, but what we can do is to first 1) find > the > > > > > coordinator (and if the random broker does not even recognize the > new > > > > > discover type, fall back to old path directly), and then 2) ask the > > > > > discovered coordinator about its supported APIVersion. > > > > > > > > > > > > > The caveat here is that we have to make sure both the group > coordinator > > > and > > > > transaction coordinator are on the latest version during init stage. > > This > > > > is potentially doable as we only need a consumer group.id > > > > to check that. In the meantime, a hard-coded config is still a > > favorable > > > > backup in case the server has downgraded, so you will want to use a > new > > > > version client without `consumer group` transactional support. > > > > > > > > 5. This is a meta question: have you considered how this can be > applied > > > to > > > > > Kafka Connect as well? For example, for source connectors, the > > > assignment > > > > > is not by "partitions", but by some other sort of "resources" based > > on > > > > the > > > > > source systems, how KIP-447 would affect Kafka Connectors that > > > > implemented > > > > > EOS as well? > > > > > > > > > > > > > No, it's not currently included in the scope. Could you point me to a > > > > sample source connector who uses EOS? Could always piggy-back into > the > > > > TxnProducerIdentity struct with more information such as tasks. If > > > > this is something to support in near term, an abstract type called > > > > "Resource" could be provided and let topic partition and connect task > > > > implement it. > > > > > > > > > > > > > > > > > > Guozhang > > > > > > > > > > > > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma <ism...@juma.me.uk> > > wrote: > > > > > > > > > > > Hi Boyang, > > > > > > > > > > > > Thanks for the KIP. It's good that we listed a number of rejected > > > > > > alternatives. It would be helpful to have an explanation of why > > they > > > > were > > > > > > rejected. > > > > > > > > > > > > Ismael > > > > > > > > > > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen <bche...@outlook.com > > > > > > wrote: > > > > > > > > > > > > > Hey all, > > > > > > > > > > > > > > I would like to start a discussion for KIP-447: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics > > > > > > > > > > > > > > this is a work originated by Jason Gustafson and we would like > to > > > > > proceed > > > > > > > into discussion stage. > > > > > > > > > > > > > > Let me know your thoughts, thanks! > > > > > > > > > > > > > > Boyang > > > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > -- Guozhang > > > > > > > > > > > > > > > > > -- > -- Guozhang >