>
> I think co-locating does have some merits here, i.e. letting the
> ConsumerCoordinator which has the source-of-truth of assignment to act as
> the TxnCoordinator as well; but I agree there's also some cons of coupling
> them together. I'm still a bit inclining towards colocation but if there
> are good rationales not to do so I can be convinced as well.


The good rationale is that we have no mechanism to colocate partitions ;).
Are you suggesting we store the group and transaction state in the same
log? Can you be more concrete about the benefit?

-Jason

On Tue, Jun 25, 2019 at 10:51 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Hi Boyang,
>
> 1. One advantage of retry against on-hold is that it will not tie-up a
> handler thread (of course the latter could do the same but that involves
> using a purgatory which is more complicated), and also it is less likely to
> violate request timeout. So I think there are some rationales to prefer
> retries.
>
> 2. Regarding "ConsumerRebalanceListener": both ConsumerRebalanceListener
> and PartitionAssignors are user-customizable modules, and only difference
> is that the former is specified via code and the latter is specified via
> config.
>
> Regarding Jason's proposal of ConsumerAssignment, one thing to note though
> with KIP-429 the onPartitionAssigned may not be called if the assignment
> does not change, whereas onAssignment would always be called at the end of
> sync-group response. My proposed semantics is that
> `RebalanceListener#onPartitionsXXX` are used for notifications to user, and
> hence if there's no changes these will not be called, whereas
> `PartitionAssignor` is used for assignor logic, whose callback would always
> be called no matter if the partitions have changed or not.
>
> 3. I feel it is a bit awkward to let the TxnCoordinator keeping partition
> assignments since it is sort of taking over the job of the
> ConsumerCoordinator, and may likely cause a split-brain problem as two
> coordinators keep a copy of this assignment which may be different.
>
> I think co-locating does have some merits here, i.e. letting the
> ConsumerCoordinator which has the source-of-truth of assignment to act as
> the TxnCoordinator as well; but I agree there's also some cons of coupling
> them together. I'm still a bit inclining towards colocation but if there
> are good rationales not to do so I can be convinced as well.
>
> 4. I guess I'm preferring the philosophy of "only add configs if there's no
> other ways", since more and more configs would make it less and less
> intuitive out of the box to use.
>
> I think it's a valid point that checks upon starting up does not cope with
> brokers downgrading but even with a config, but it is still hard for users
> to determine when they can be ensured the broker would never downgrade
> anymore and hence can safely switch the config. So my feeling is that this
> config would not be helping too much still. If we want to be at the safer
> side, then I'd suggest we modify the Coordinator -> NetworkClient hierarchy
> to allow the NetworkClient being able to pass the APIVersion metadata to
> Coordinator, so that Coordinator can rely on that logic to change its
> behavior dynamically.
>
> 5. I do not have a concrete idea about how the impact on Connect would
> make, maybe Randall or Konstantine can help here?
>
>
> Guozhang
>
> On Mon, Jun 24, 2019 at 10:26 PM Boyang Chen <reluctanthero...@gmail.com>
> wrote:
>
> > Hey Jason,
> >
> > thank you for the proposal here. Some of my thoughts below.
> >
> > On Mon, Jun 24, 2019 at 8:58 PM Jason Gustafson <ja...@confluent.io>
> > wrote:
> >
> > > Hi Boyang,
> > >
> > > Thanks for picking this up! Still reading through the updates, but here
> > are
> > > a couple initial comments on the APIs:
> > >
> > > 1. The `TxnProducerIdentity` class is a bit awkward. I think we are
> > trying
> > > to encapsulate state from the current group assignment. Maybe something
> > > like `ConsumerAssignment` would be clearer? If we make the usage
> > consistent
> > > across the consumer and producer, then we can avoid exposing internal
> > state
> > > like the generationId.
> > >
> > > For example:
> > >
> > > // Public API
> > > interface ConsumerAssignment {
> > >   Set<TopicPartition> partittions();
> > > }
> > >
> > > // Not a public API
> > > class InternalConsumerAssignment implements ConsumerAssignment {
> > >   Set<TopicPartition> partittions;
> > >   int generationId;
> > > }
> > >
> > > Then we can change the rebalance listener to something like this:
> > > onPartitionsAssigned(ConsumerAssignment assignment)
> > >
> > > And on the producer:
> > > void initTransactions(String groupId, ConsumerAssignment assignment);
> > >
> > > 2. Another bit of awkwardness is the fact that we have to pass the
> > groupId
> > > through both initTransactions() and sendOffsetsToTransaction(). We
> could
> > > consider a config instead. Maybe something like `
> transactional.group.id
> > `?
> > > Then we could simplify the producer APIs, potentially even deprecating
> > the
> > > current sendOffsetsToTransaction. In fact, for this new usage, the `
> > > transational.id` config is not needed. It would be nice if we don't
> have
> > > to
> > > provide it.
> > >
> > > I like the idea of combining 1 and 2. We could definitely pass in a
> > group.id config
> > so that we could avoid exposing that information in a public API. The
> > question I have
> > is that whether we should name the interface `GroupAssignment` instead,
> so
> > that Connect later
> > could also extend on the same interface, just to echo Guozhang's point
> > here, Also the base interface
> > is better to be defined empty for easy extension, or define an abstract
> > type called `Resource` to be shareable
> > later IMHO.
> >
> >
> > > By the way, I'm a bit confused about discussion above about colocating
> > the
> > > txn and group coordinators. That is not actually necessary, is it?
> > >
> > > Yes, this is not a requirement for this KIP, because it is inherently
> > impossible to
> > achieve co-locating  topic partition of transaction log and consumed
> offset
> > topics.
> >
> >
> > > Thanks,
> > > Jason
> > >
> > On Mon, Jun 24, 2019 at 10:07 AM Boyang Chen <reluctanthero...@gmail.com
> >
> > > wrote:
> > >
> > > > Thank you Ismael for the suggestion. We will attempt to address it by
> > > > giving more details to rejected alternative section.
> > > >
> > > >
> > > > Thank you for the comment Guozhang! Answers are inline below.
> > > >
> > > >
> > > >
> > > > On Sun, Jun 23, 2019 at 6:33 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > > > Hello Boyang,
> > > > >
> > > > > Thanks for the KIP, I have some comments below:
> > > > >
> > > > > 1. "Once transactions are complete, the call will return." This
> seems
> > > > > different from the existing behavior, in which we would return a
> > > > retriable
> > > > > CONCURRENT_TRANSACTIONS and let the client to retry, is this
> > > intentional?
> > > > >
> > > >
> > > > I don’t think it is intentional, and I will defer this question to
> > Jason
> > > > when he got time to answer since from what I understood retry and on
> > hold
> > > > seem both valid approaches.
> > > >
> > > >
> > > > > 2. "an overload to onPartitionsAssigned in the consumer's rebalance
> > > > > listener interface": as part of KIP-341 we've already add this
> > > > information
> > > > > to the onAssignment callback. Would this be sufficient? Or more
> > > generally
> > > > > speaking, which information have to be passed around in rebalance
> > > > callback
> > > > > while others can be passed around in PartitionAssignor callback? In
> > > > Streams
> > > > > for example both callbacks are used but most critical information
> is
> > > > passed
> > > > > via onAssignment.
> > > > >
> > > >
> > > > We still need to extend ConsumerRebalanceListener because it’s the
> > > > interface we could have public access to. The #onAssignment call is
> > > defined
> > > > on PartitionAssignor level which is not easy to work with external
> > > > producers.
> > > >
> > > >
> > > > > 3. "We propose to use a separate record type in order to store the
> > > group
> > > > > assignment.": hmm, I thought with the third typed FindCoordinator,
> > the
> > > > same
> > > > > broker that act as the  consumer coordinator would always be
> selected
> > > as
> > > > > the txn coordinator, in which case it can access its local cache
> > > > metadata /
> > > > > offset topic to get this information already? We just need to think
> > > about
> > > > > how to make these two modules directly exchange information without
> > > > messing
> > > > > up the code hierarchy.
> > > > >
> > > >
> > > > These two coordinators will be on the same broker only when number of
> > > > partitions for transaction state topic and consumer offset topic are
> > the
> > > > same. This normally holds true, but I'm afraid
> > > > we couldn't make this assumption?
> > > >
> > > > 4. The config of "CONSUMER_GROUP_AWARE_TRANSACTION": it seems the
> goal
> > of
> > > > > this config is just to avoid old-versioned broker to not be able to
> > > > > recognize newer versioned client. I think if we can do something
> else
> > > to
> > > > > avoid this config though, for example we can use the embedded
> > > AdminClient
> > > > > to send the APIVersion request upon starting up, and based on the
> > > > returned
> > > > > value decides whether to go to the old code path or the new
> behavior.
> > > > > Admittedly asking a random broker about APIVersion does not
> guarantee
> > > the
> > > > > whole cluster's versions, but what we can do is to first 1) find
> the
> > > > > coordinator (and if the random broker does not even recognize the
> new
> > > > > discover type, fall back to old path directly), and then 2) ask the
> > > > > discovered coordinator about its supported APIVersion.
> > > > >
> > > >
> > > > The caveat here is that we have to make sure both the group
> coordinator
> > > and
> > > > transaction coordinator are on the latest version during init stage.
> > This
> > > > is potentially doable as we only need a consumer group.id
> > > > to check that. In the meantime, a hard-coded config is still a
> > favorable
> > > > backup in case the server has downgraded, so you will want to use a
> new
> > > > version client without `consumer group` transactional support.
> > > >
> > > > 5. This is a meta question: have you considered how this can be
> applied
> > > to
> > > > > Kafka Connect as well? For example, for source connectors, the
> > > assignment
> > > > > is not by "partitions", but by some other sort of "resources" based
> > on
> > > > the
> > > > > source systems, how KIP-447 would affect Kafka Connectors that
> > > > implemented
> > > > > EOS as well?
> > > > >
> > > >
> > > > No, it's not currently included in the scope. Could you point me to a
> > > > sample source connector who uses EOS? Could always piggy-back into
> the
> > > > TxnProducerIdentity struct with more information such as tasks. If
> > > > this is something to support in near term, an abstract type called
> > > > "Resource" could be provided and let topic partition and connect task
> > > > implement it.
> > > >
> > > >
> > > > >
> > > > > Guozhang
> > > > >
> > > > >
> > > > > On Sat, Jun 22, 2019 at 8:40 PM Ismael Juma <ism...@juma.me.uk>
> > wrote:
> > > > >
> > > > > > Hi Boyang,
> > > > > >
> > > > > > Thanks for the KIP. It's good that we listed a number of rejected
> > > > > > alternatives. It would be helpful to have an explanation of why
> > they
> > > > were
> > > > > > rejected.
> > > > > >
> > > > > > Ismael
> > > > > >
> > > > > > On Sat, Jun 22, 2019 at 8:31 PM Boyang Chen <bche...@outlook.com
> >
> > > > wrote:
> > > > > >
> > > > > > > Hey all,
> > > > > > >
> > > > > > > I would like to start a discussion for KIP-447:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-447%3A+Producer+scalability+for+exactly+once+semantics
> > > > > > >
> > > > > > > this is a work originated by Jason Gustafson and we would like
> to
> > > > > proceed
> > > > > > > into discussion stage.
> > > > > > >
> > > > > > > Let me know your thoughts, thanks!
> > > > > > >
> > > > > > > Boyang
> > > > > > >
> > > > > >
> > > > >
> > > > >
> > > > > --
> > > > > -- Guozhang
> > > > >
> > > >
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to