Thanks once again for the KIP and the updates, Chris. As mentioned in my
prior email, here are my more general comments/questions (other than my
earlier question about the transaction boundaries). Hopefully these are
readable:

The worker-level offset.flush.timeout.ms property will be ignored for
> exactly-once source tasks. They will be allowed to take as long as
> necessary to complete an offset commit, since the cost of failure at that
> point is to fail the source task. Currently, all source task offset commits
> take place on a single shared worker-global thread. In order to support
> source task commits without a timeout, but also prevent  laggy tasks from
> disrupting the availability of other tasks on the cluster, the worker will
> be modified to permit simultaneous source task offset commits.


This seems to contradict the earlier quote. Specifically, the first quote
above states that transaction boundaries are dictated by offset flushes,
which is controlled by the `offset.flush.interval.ms` property. However,
the second quote says the `offset.flush.timeout.ms` property will be
ignored for EOS source tasks. I must be missing something.

It may take longer than the transaction timeout for a task to flush all of
> its records to Kafka. In this case, there are two actions that users can
> take to nurse their connector back to health: reconfigure it to produce
> records with a lower throughput, or increase the transaction timeout for
> the producers used by the connector.


Does it seem realistic or practical to suggest to reduce the source
connector's throughput?

Finally, it allows users to limit the effect that hanging transactions on
> an offsets topic will have. If tasks A and B use the same offsets topic,
> and task A initiates a transaction on that offsets topic right before task
> B starts up, then task A dies suddenly without committing its transaction,
> task B will have to wait for that transaction to time out before it can
> read to the end of the offsets topic. If the transaction timeout is set
> very high for task A (to accommodate bursts of high throughput, for
> example), this will block task B from processing any data for a long time.
> Although this scenario may be unavoidable in some cases, using a dedicated
> offsets topic for each connector should allow cluster administrators to
> isolate the blast radius of a hanging transaction on an offsets topic. This
> way, although tasks of the same connector may still interfere with each
> other, they will at least not interfere with tasks of other connectors.
> This should be sufficient for most multitenant environments.


Do you mean to use "task A" and "task B" here? Do they imply tasks from the
same connector? If so, then won't the offsets from both of those tasks
still be written to the same connector-specific offset topic, and suffer
from the potential blocking issue mentioned above? While it's useful to
call this out, I'm not sure how that example helps support the motivation
for a separate per-connector offset topic. OTOH, if these were tasks from
_different_ connectors, then it becomes clear that the offsets from one
source connector using a connector-specific offsets topic will never block
the offsets from another connector using a different connector-specific
offsets topic. Thus, having each connector use separate connector-specific
offset topics at least avoids the problem of one connector's tasks blocking
the tasks of the other connector just because of transaction commit timeout
issues.

Regardless of whether exactly.once.source.enabled  is set to true  for the
> worker, if a connector configuration contains a value for the
> offsets.storage.topic  property, it will use an offsets topic with that
> name on the Kafka cluster that it produces data to (which may be different
> from the one that hosts the worker's global offsets topic).


It's implied but not explicitly stated that this will result in duplicating
the offsets for EOS source connectors: the workers will continue to track
all source offsets in its own offsets topic, and EOS-enabled source
connectors will also track their own offsets in connector-specific offsets
topics. It might be worth making that more obvious, and which will be used
upon connector restarts.

If a connector is explicitly or implicitly configured to use a separate
> offsets topic but that topic does not exist yet, task and connector
> instances will automatically try to create the topic before startup.


Are you suggesting that the source connector and task implementations be
responsible for this? Or are you suggesting that the worker will be
responsible for creating the offsets topics on behalf of source connector
and task implementations (using an admin client per the configurations),
just like the worker currently does so for the cluster's offsets topic?
Please clarify this in the KIP.

If a worker is active and does not have support for exactly-once delivery...


Do you think it's worthwhile adding a parenthetical clause after this
statement that says "(either `exactly.once.source.enabled=false` or an
older Connect worker that does not have EOS support for source
connectors)"? Yes, those are called out above, but IMO this would help
clarify exactly what this paragraph means for operators -- and how it is
not a burden for them to avoid this condition.

In the "Connector principal permissions" section, the "IdempotentWrite" row
says "Kafka cluster targeted by the Connect cluster" for the resource name
column. Is this right? Should that instead be "Kafka cluster to which the
Connector writes"?

In the "Rolling upgrade(s)" section, should the first and second rolling
upgrades be completed before enabling EOS on any source connector? If so,
should this section call this out in more detail?

In the "Downgrade" section:

Two kinds of downgrade are possible. Exactly-once support for a cluster can
> be disabled by setting exactly.once.source.enabled to false for workers in
> the cluster (a “soft” downgrade), or workers can be reverted to use older
> versions of the Connect framework that does not support exactly-once
> sources at all (a “hard” downgrade).


How does this work with a rolling "downgrade"? The "Heterogeneous clusters"
subsection of the "Permitted failure scenarios" section talks about (I
think) how setting `exactly.once.source.enabled=false` for one worker
effectively breaks the EOS delivery guarantees for the whole Connect
cluster. If that is the case, how does this rolling downgrade work? Would
it simply require that all source connectors configured to use EOS be
changed to not use EOS, and then to do a rolling downgrade of the worker's
configuration to disable EOS? If this is the case, let's be more explicit
in this section on downgrading.

In the "Soft downgrade" section, it sounds like this is effectively not a
rolling downgrade. Nothing in the "Downgrades" section really implies that
rolling downgrades are not supported, but at the same time it's not
terribly obvious.
Overall, it might be worth considering (if you haven't already) modifying
the Downgrade section to be a bit more prescriptive in terms of steps that
should be taken.

Finally, do we need to expose any metrics that are related to maybe the
average number of records in each transaction? Or should we rely solely
upon existing producer metrics?

Best regards,

Randall

On Tue, May 4, 2021 at 4:11 PM Chris Egerton <chr...@confluent.io.invalid>
wrote:

> Hi all,
>
> Good news everyone! I've reworked the design one more time and hopefully
> some of the improvements here should make the proposal more palatable.
> TL;DR:
>
> - Rolling upgrades are now possible, in at most two phases; workers will
> first have to be given a binary upgrade to 3.0 (the targeted version for
> this feature) which can be a rolling upgrade, and then a rolling upgrade to
> enable exactly-once source support in a cluster should be possible with no
> anticipated downtime for source connectors or their tasks
> - Offset topic migration is completely removed in favor of fallback to the
> global offsets topic (much simpler!)
> - One backwards-incompatible change is introduced: the leader will be
> required to use a transactional producer for writes to the config topic
> regardless of whether exactly-once support is enabled on the cluster.
> Technically we could gate this behind a config property but since the
> benefits of a transactional producer actually extend beyond exactly-once
> source support (we can now ensure that there's only one writer to the
> config topic at any given time, which isn't guaranteed with the current
> model) and the cost to accommodate it is fairly low (a handful of
> well-defined and limited-scope ACLs), I erred on the side of keeping things
> simple
>
> Looking forward to the next round of review and really hoping we can get
> the ball rolling in time for this to land with 3.0!
>
> Cheers,
>
> Chris
>
> On Mon, Apr 12, 2021 at 7:51 AM Chris Egerton <chr...@confluent.io> wrote:
>
> > Hi Randall,
> >
> > After thinking things over carefully, I've done some reworking of the
> > design. Instead of performing zombie fencing during rebalance, the leader
> > will expose an internal REST endpoint that will allow workers to request
> a
> > round of zombie fencing on demand, at any time. Workers will then hit
> this
> > endpoint after starting connectors and after task config updates for
> > connectors are detected; the precise details of this are outlined in the
> > KIP. If a round of fencing should fail for any reason, the worker will be
> > able to mark its Connector failed and, if the user wants to retry, they
> can
> > simply restart the Connector via the REST API (specifically, the POST
> > /connectors/{connector}/restart endpoint).
> >
> > The idea I'd been playing with to allow workers to directly write to the
> > config topic seemed promising at first, but it allowed things to get
> pretty
> > hairy for users if any kind of rebalancing bug took place and two workers
> > believed they owned the same Connector object.
> >
> > I hope this answers any outstanding questions and look forward to your
> > thoughts.
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, Mar 22, 2021 at 4:38 PM Chris Egerton <chr...@confluent.io>
> wrote:
> >
> >> Hi Randall,
> >>
> >> No complaints about email size from me. Let's dive in!
> >>
> >> 1. Sure! Especially important in my mind is that this is already
> possible
> >> with Connect as it is today, and users can benefit from this with or
> >> without the expanded exactly-once souce support we're trying to add with
> >> this KIP. I've added that info to the "Motivation" section and included
> a
> >> brief overview of the idempotent producer in the "Background and
> >> References" section.
> >>
> >> 2. I actually hadn't considered enabling exactly-once source support by
> >> default. Thinking over it now, I'm a little hesitant to do so just
> because,
> >> even with the best testing out there, it's a pretty large change and it
> >> seems safest to try to make it opt-in in case there's unanticipated
> >> fallout. Then again, when incremental cooperative rebalancing was
> >> introduced, it was made opt-out instead of opt-in. However, ICR came
> with
> >> lower known risk of breaking existing users' setups; we know for a fact
> >> that, if you haven't granted your worker or connector principals some
> ACLs
> >> on Kafka, your connectors will fail. In an ideal world people would
> >> carefully read upgrade notes and either grant those permissions or
> disable
> >> the feature before upgrading their Connect cluster to 3.0, but if they
> >> don't, they'll be in for a world of hurt. Overall I'd be more
> comfortable
> >> letting this feature incubate for a little bit to let everyone get
> familiar
> >> with it before possibly enabling it in 4.0 by default; what do you
> think?
> >>
> >> 3. I didn't think too long about the name for the offsets topic
> property;
> >> it seemed pretty unlikely that it'd conflict with existing connector
> >> property names. One alternative could be to follow the idiom
> established by
> >> KIP-458 and include the word "override" in there somewhere, but none of
> the
> >> resulting names seem very intuitive ("offsets.storage.override.topic"
> seems
> >> the best to me but even that isn't super clear). Happy to field
> suggestions
> >> if anyone has any alternatives they'd like to propose.
> >>
> >> 4. I _really_ wanted to enable per-connector toggling of this feature
> for
> >> the exact reasons you've outlined. There's already a couple cases where
> >> some piece of functionality was introduced that users could only
> control at
> >> the worker config level and, later, effort had to be made in order to
> add
> >> per-connector granularity: key/value converters and connector Kafka
> clients
> >> are the two at the top of my head, and there may be others. So if at all
> >> possible, it'd be great if we could support this. The only thing
> standing
> >> in the way is that it allows exactly-once delivery to be compromised,
> which
> >> in my estimation was unacceptable. I'm hoping we can make this feature
> >> great enough that it'll work with most if not all source connectors out
> >> there, and users won't even want to toggle it on a per-connector basis.
> >> Otherwise, we'll have to decide between forcing users to split their
> >> connectors across two Connect clusters (one with exactly-once source
> >> enabled, one with it disabled), which would be awful, or potentially
> seeing
> >> duplicate record delivery for exactly-once connectors, which is also
> awful.
> >>
> >> 5. Existing source connectors won't necessarily have to be configured
> >> with "consumer.override" properties, but there are a couple of cases
> where
> >> that will be necessary:
> >>
> >> a. Connector is running against against a secured Kafka cluster and the
> >> principal configured in the worker via the various "consumer."
> properties
> >> (if there is one) doesn't have permission to access the offsets topic
> that
> >> the connector will use. If no "consumer.override." properties are
> specified
> >> in this case, the connector and its tasks will fail.
> >>
> >> b. Connector is running against a separate Kafka cluster from the one
> >> specified in the worker's config with the "bootstrap.servers" property
> (or
> >> the "consumer.bootstrap.servers" property, if specified). For an
> example,
> >> the connector is configured with a "producer.override.boostrap.servers"
> >> property. If no corresponding "consumer.override.bootstrap.servers"
> >> property is specified by the connector (or on is provided but doesn't
> match
> >> the one used for the producer), bad things will happen--best case, the
> >> connector refuses to start up because it's configured with a separate
> >> offsets topic and no offsets migration is able to take place; worst
> case,
> >> the connector starts reading offsets from a stale topic and every time a
> >> task is restarted, duplicate records ensue. Since this can compromise
> >> delivery guarantees I think action is worth taking here, and I'm glad
> >> you've raised this scenario. Proposed approach: If the user hasn't
> >> specified a "consumer.override.bootstrap.servers" property in their
> >> connector config and the "bootstrap.servers" property that we'd
> configure
> >> its consumer with (based on either the "consumer.bootstrap.servers"
> worker
> >> property or, if not provided, the "bootstrap.servers" worker property)
> is
> >> different from the "bootstrap.servers" that we'd configure its producer
> >> with, we can log a warning but then automatically configure the consumer
> >> with the "bootstrap.servers" that are going to be used for the
> producer. If
> >> the user has specified a "consumer.override.bootstrap.servers" property
> in
> >> their connector config and it differs from the one that the connector's
> >> producer will use, we can fail the connector and all of its tasks, in
> order
> >> to prevent delivery guarantees from being compromised. Thoughts?
> >>
> >> 6. Sure, I've updated the KIP with that change in all the places it
> >> applies.
> >>
> >> 7. Sure, I've updated the KIP to be clear that these methods will also
> be
> >> invoked for the end-of-life offset commit of a task scheduled for
> shutdown.
> >>
> >> 8.
> >>
> >> > Won't there also be transaction contention just within a single
> >> connector using multiple tasks? Even if that connector has all offsets
> go
> >> to a dedicated topic (separate from all other connectors), then isn't it
> >> still possible/likely that a transaction from task 1 blocks a
> transaction
> >> from task 2?
> >>
> >> Yes, this is completely correct.
> >>
> >> > How will this affect latency perceived by consumers (e.g., the
> >> records from task 2's tranasction don't become visible until the
> >> transactions from task 1 and task 2 commit)?
> >>
> >> Worst case, task startup and requests from existing tasks to read from
> >> the offsets topic will block for up to the producer transaction timeout
> for
> >> the hung transaction. However, this is fairly unlikely as it would
> require
> >> two things to happen (or not happen):
> >>
> >> a. The task would have to die in between writing at least one record to
> >> the offsets topic and committing the transaction, which in most cases
> >> should be a fairly small window.
> >>
> >> b. The task would have to not be restarted (either manually by the user,
> >> or automatically by some monitoring system) before then. If the task
> were
> >> to be restarted, it'd cause any hanging transactions from older task
> >> instances to be aborted immediately.
> >>
> >> I can add some of this to the KIP if you think it's worth calling out,
> >> just let me know.
> >>
> >> 9. I'll try to clear it up but I'm hoping that the text very soon after
> >> that section helps clarify this point already: "the worker will
> continue to
> >> read to the end of the topic until either offset information for the
> >> connector or a sentinel value is found". Is the muddiness here perhaps
> >> because it's not explicitly stated that sentinel values will apply on a
> >> per-connector basis (as opposed to, e.g., a per-offsets-topic basis)?
> >>
> >> 10.
> >>
> >> > This section says "then workers will not bring up tasks for the
> >> connector." What does this mean? Does it mean the tasks fail?
> >>
> >> I'm not in favor of failing tasks since that would either require manual
> >> intervention on the part of the user or go against the current flow of
> >> failed task recovery (i.e., requiring manual intervention) by
> automatically
> >> transitioning the task from the "FAILED" state to "RUNNING" once the
> >> condition for safe startup was met. Task restarts would also have no
> effect
> >> in this case, and putting the task into a state where a restart is
> >> frequently the appropriate remedial action for users could be
> misleading.
> >>
> >> I was hoping we could leave existing tasks in the UNASSIGNED state.
> >> Warning log messages can be emitted notifying the user that the worker
> was
> >> assigned tasks but is not able to start them yet due to constraints
> >> introduced by exactly-once source support.
> >>
> >> I suspect there may be some more discussion on this topic before a
> design
> >> is settled on, so I'll hold off on updating the KIP for now.
> >>
> >> > How do users recover from this?
> >>
> >> (Addressed with response to 11)
> >>
> >> 11.
> >>
> >> > What exactly does "refuse to bring the task up" mean?
> >>
> >> I hope I've covered this with the information from my response to part
> 10.
> >>
> >> > Also, "until a rebalance occurs" suggests that another rebalance will
> >> be forthcoming? What would
> >> cause that subsequent rebalance? Isn't it possible that there is no
> >> subsequent rebalance for a potentially significant amount of time?
> >>
> >> (And, from 10):
> >>
> >> > How do users recover from this?
> >>
> >> This area is a little rough and more detail is definitely warranted.
> >> Unfortunately, there's also some open questions that, after some
> thought,
> >> aren't really covered by the design I had in mind.
> >>
> >> I was originally thinking it'd go something like this: leader fails to
> >> fence out producers in time during rebalance, rebalance continues,
> workers
> >> all notice that latest task configs for the connector are after the
> latest
> >> task count record, workers don't bring up tasks (leaving them
> UNASSIGNED),
> >> then eventually leader is able to finish fencing out producers, writes a
> >> new task count record to the config topic, and after the ensuing
> rebalance,
> >> all workers begin to bring up tasks for that connector.
> >>
> >> Unfortunately, a key assumption here is that the leader will eventually
> >> succeed at fencing out producers for the connector, which isn't always
> >> going to be the case if, for example, it lacks the right ACLs on the
> Kafka
> >> cluster. If the leader fails to perform zombie fencing with a
> non-retriable
> >> error, it's important that this information be surfaced to the user, and
> >> log messages and even JMX metrics don't seem powerful enough for this
> case.
> >>
> >> I've toyed with the idea of allowing non-leader workers to take on the
> >> work of zombie fencing and producing task count records to the config
> >> topic. We could then assign responsibility for this action to the worker
> >> that owns the Connector object for a connector. This would also require
> >> task configs to be written to the config topic by those same workers,
> since
> >> otherwise the worker that owns the Connector object might accidentally
> >> write a task count record to the config topic immediately after the
> leader
> >> has written a new set of task configs, and it would look like the new
> set
> >> of task configs is ready to use even though the task count record
> actually
> >> corresponded to an earlier set of task configs (in fact, this is
> actually a
> >> possibility with the current design if a zombie leader writes new task
> >> configs to the config topic during a rolling upgrade).
> >>
> >> With that in mind, I think an improved design might be something like
> >> this:
> >>
> >> a. Connector is reconfigured
> >> b. The worker that owns the Connector object for that connector uses a
> >> transactional producer whose ID corresponds to the connector name to
> write
> >> those task configs to the config topic
> >> c. Workers pick up those new task configs and preemptively attempt to
> >> shut down all tasks for that connector
> >> c. Rebalance ensues
> >> d. Worker that owns the Connector object for that connector performs a
> >> round of zombie fencing and then, using a transactional producer whose
> ID
> >> corresponds to the connector name, writes the task count record for that
> >> connector to the config topic
> >> e. Workers pick up the task count record and start tasks for that
> >> connector
> >>
> >> If step d) fails with a non-retriable error, the worker can mark the
> >> Connector object "FAILED" in the status API with a stack trace
> >> corresponding to the error encountered while trying to perform the
> round of
> >> zombie fencing, and a message stating that no new tasks for the
> connector
> >> can be started until this problem is addressed. And, if the user tries
> to
> >> restart the connector, another round of zombie fencing can be attempted.
> >>
> >> How does this sound to you, especially in terms of correctness (i.e.,
> not
> >> adding potential holes in delivery guarantees) and user experience? If
> it
> >> looks good I'll update the KIP accordingly, but I suspect there might
> still
> >> be some discussion on this topic.
> >>
> >> Further discussion on the bug where a zombie leader writes task configs
> >> to the config topic after a task count record has been produced during a
> >> rolling upgrade to come later as a response to 15.
> >>
> >> 12.
> >>
> >> > What exactly does "abort startup of the task" mean?
> >> How do users recover from this? Again, I more explicit behavior would be
> >> helpful.
> >>
> >> "Abort" is probably the wrong term to use here. "Abandon and re-attempt"
> >> might be better. Essentially, we'd discard the current task, but with
> the
> >> assumption that a new one has either already been brought up in its
> place,
> >> or will be soon as a response to the rebalance caused by the new set of
> >> task configurations for the connector. No action on the part of the user
> >> should be required here. I've updated the section to add this info.
> >>
> >> 13.
> >>
> >> > Isn't it possible given N producer transactional IDs for there to be
> up
> >> to
> >> N transaction coordinators?
> >>
> >> I believe there is only one transaction coordinator per broker, so we'd
> >> only have up to B transactional coordinators where B is the number of
> >> brokers.
> >>
> >> > Does this reduce the likelihood that it can be
> >> completed successfully? What risk does this play in rebalances being
> >> unsuccessful?
> >>
> >> Regardless of the info above, these are still valid questions. I think
> >> the short answer to the first part is "yes"--the more transaction
> >> coordinators you have to reach at once, the more likely it becomes that
> >> something goes wrong somewhere. With regards to "successful" or
> >> "unsuccessful" rebalances, I'm guessing you're really asking about the
> >> round of zombie fencing being performed during them, since the only
> impact
> >> on the rebalance itself will be that things might take a little longer
> with
> >> the leader may blocking during assignment. As far as the viability of
> >> zombie fencing goes, it does introduce a point of failure that will
> prevent
> >> tasks from being brought up for a connector, which isn't great.
> However, if
> >> one or more transaction coordinators associated with a connector's tasks
> >> aren't available, those tasks can't be brought up anyways, since it
> can't
> >> be guaranteed that the old generation has been fenced out. I'm
> optimistic
> >> that, with the improvements (and the retry path offered) in response to
> 11,
> >> this won't be a showstopper and won't cause too much pain for users.
> >>
> >> 14. Thanks, added.
> >>
> >> 15.
> >>
> >> > IIUC, the net effect is that rolling upgrades don't really work for
> >> source
> >> connectors.
> >>
> >> Yep, that's correct. If you get unlucky and the worker that owns the
> >> Connector object for your connector is the last one to be upgraded, it
> >> won't be able to perform the offset migration until the upgrade is
> >> complete, and all of the tasks for that connector will be stuck idling
> in
> >> the meantime waiting for that to complete.
> >>
> >> > Wouldn't it just be easier and maybe faster to stop and restart
> >> the whole cluster?
> >>
> >> Sure, if you're unconcerned with any sink connectors that might be
> >> running on the cluster. I've also realized that, with the possibility of
> >> zombie leaders writing task configs to the config topic during a rolling
> >> upgrade (and possibly doing so right before a task count record is
> >> written), there's some vulnerability and confusion that might come about
> >> during a rolling upgrade and turn users off of the feature. We can
> >> definitely engineer around this (possibly by including some kind of flag
> >> with task configs that indicates that they've been written by a worker
> >> using a transactional producer, and ignoring any task configs that don't
> >> use that flag), but at this point I'm kind of leaning in the direction
> of
> >> requiring (or at least, strongly recommending) a mass upgrade for
> clusters
> >> when enabling exactly-once source support, since no matter what, they're
> >> still going to see downtime with source connectors. Curious what your
> >> thoughts are on this, though.
> >>
> >> > Also, what are the states of the source tasks during
> >> this time?
> >>
> >> Covered above in response to another question, but to reiterate: they
> >> will be UNASSIGNED.
> >>
> >> 16. Good call, added.
> >>
> >> 17. Sure, I'll add a "Limitations" section with the points you've
> >> described and any others that seem applicable.
> >>
> >>
> >> Thanks again for taking a look!
> >>
> >> Cheers,
> >>
> >> Chris
> >>
> >> On Mon, Mar 22, 2021 at 11:39 AM Chris Egerton <chr...@confluent.io>
> >> wrote:
> >>
> >>> Hi Randall,
> >>>
> >>> Thanks for taking a look! Responses inline:
> >>>
> >>> > This would seem to make the transactions based upon time rather than
> >>> record
> >>> count. Is that really the intent? Did you consider using one
> transaction
> >>> per "batch" of records, whatever number of records that means? One
> issue
> >>> with the time-based boundary is that `offset.flush.interval.ms`
> >>> defaults to
> >>> 60 seconds, and that could mean many hundreds of thousands of records
> on
> >>> a
> >>> high throughput task. (BTW, I do agree with the KIP's rejected
> >>> alternative
> >>> to not expose to the task or let the task define the transaction
> >>> boundaries.)
> >>>
> >>> Yep, this was the intent. I covered a few alternatives in the "Finer
> >>> control over offset commits" section in the rejected alternatives all
> the
> >>> way at the bottom, including allowing source tasks to define their own
> >>> transaction boundaries, allowing per-connector source offset
> intervals, and
> >>> allowing offsets to be committed after a certain number of records.
> While
> >>> writing this up I also realized that, besides establishing batch
> boundaries
> >>> based on the number of records, another possibility could also be to
> count
> >>> each collection of records returned from a single call to
> SourceTask::poll
> >>> as its own batch, although this might seriously hamper throughput for
> tasks
> >>> that provide a small number of records at a time, but do so with high
> >>> frequency.
> >>>
> >>> I was hoping we could save this for future work since it's unclear that
> >>> it would be necessary to operate Connect with exactly-once source
> support.
> >>> It almost seems like a bit of a leaky abstraction for users to have to
> >>> worry about the details of offset commit logic, and I'm hoping we can
> do
> >>> some magic behind the curtains to avoid this. Here's a few thoughts on
> how
> >>> we can keep fixed-time-interval offset flushing with exactly-once
> source
> >>> support without making things worse for users:
> >>>
> >>> 1. Ignore the offset flush timeout parameter for exactly-once source
> >>> tasks. They should be allowed to take as long as they want to complete
> an
> >>> offset commit, since the cost of failure at that point is to fail the
> >>> source task. This might require a change to how Connect manages source
> task
> >>> offset commits right now since a single worker-global thread is used to
> >>> schedule and perform offset commits for all source tasks, and any
> blocked
> >>> task will also block offset commits for all other tasks, but I'm
> confident
> >>> that we can find a way to solve this problem and would like to leave
> it as
> >>> an implementation detail that doesn't need to be covered extensively in
> >>> this KIP.
> >>>
> >>> 2. The only other problem I foresee with high-throughput tasks like the
> >>> ones you described is that it may take longer than the transaction
> timeout
> >>> for a task to flush all of its records to Kafka. In this case, there
> are
> >>> two actions that users can take to nurse their connector back to
> health:
> >>> reconfigure it to produce records with a lower throughput, or increase
> the
> >>> transaction timeout for the producers used by the connector. We can
> include
> >>> include these steps in the error message for a task that fails due to
> >>> producer transaction timeout.
> >>>
> >>> Does this seem reasonable? If so, I'll add this info to the design doc.
> >>>
> >>>
> >>> > Gwen's #5 question was about rebalances of a source connector. Your
> >>> answer
> >>> made sense, but while the KIP talks about source tasks being stopped
> >>> before
> >>> the connector is rebalanced, I'm wondering about the delay or other
> >>> impact
> >>> this has on the rebalance itself. The "Cannot fence out producers
> during
> >>> rebalance" section talks about fencing out producers, but IIUC this is
> >>> not
> >>> the same as:
> >>>
> >>> > ... all workers will preemptively stop all tasks of all source
> >>> connectors
> >>> for which task configurations.
> >>>
> >>> > If this is already addressed in the KIP, please just point me to that
> >>> section.
> >>>
> >>> Not addressed yet :) I haven't called out a potential impact on
> >>> rebalance because, although there will probably be one, I don't believe
> >>> it'll be substantial. After a connector is reconfigured and its new
> task
> >>> configs are written to the config topic, the sequence of events for a
> >>> single worker would be:
> >>>
> >>> 1. New task configs are read from the config topic
> >>> 2. All tasks for that connector are stopped in parallel (which is
> >>> already how en-masse task start/stop is performed by a distributed
> worker)
> >>> 3. Any tasks that take longer than the graceful task shutdown timeout
> >>> are abandoned and allowed to continue running
> >>> 4. Worker rejoins the group
> >>>
> >>> The only new impact on rebalance might come from steps 2-3, but it's
> >>> unlikely to be significant. Distributed workers currently use a pool of
> >>> eight threads to handle en-masse start/stop of tasks and connectors,
> have a
> >>> default graceful task shutdown timeout of five seconds, and a default
> >>> rebalance timeout of sixty seconds. In order to block the worker for an
> >>> extra thirty seconds (only half of the rebalance timeout), there would
> have
> >>> to be (30 / 5) * 8 = 48 tasks that are either completely hung or drag
> their
> >>> feet for five seconds each during shutdown. This would have to either
> come
> >>> from a single connector, or a collection of connectors that were
> >>> reconfigured in extremely-rapid succession. The likelihood of this
> scenario
> >>> doesn't seem high enough to worry about initially, but if you feel
> >>> differently, we can consider some (hopefully lightweight)
> optimizations,
> >>> such as expanding the size of the distributed herder's start/stop
> thread
> >>> pool.
> >>>
> >>> One final point--this is also similar to the logic that was used for
> >>> eager rebalancing, except that with eager rebalancing, all tasks from
> all
> >>> connectors were stopped before rejoining a group, instead of just the
> tasks
> >>> for recently-updated connectors. Someone correct me if I'm wrong, but
> the
> >>> stopping of tasks before rejoining the group wasn't a pain point with
> eager
> >>> rebalancing; the biggest pain point was just that a change to one
> connector
> >>> would temporarily disable all connectors on the cluster.
> >>>
> >>> I'll update the KIP with a (brief) note on this in the "Fencing during
> >>> rebalance" section; let me know if the reasoning here seems valid.
> >>>
> >>>
> >>> Responses to your second email coming soon after I update the KIP with
> >>> the info from above.
> >>>
> >>> Cheers,
> >>>
> >>> Chris
> >>>
> >>> On Thu, Mar 18, 2021 at 7:28 PM Randall Hauch <rha...@apache.org>
> wrote:
> >>>
> >>>> Hey, Chris.
> >>>>
> >>>> Thanks again for all your hard work figuring out this complicated
> >>>> problem.
> >>>> While there are still issues to iron out, at the moment it seems
> pretty
> >>>> sensible.
> >>>>
> >>>> Here's my follow up email with additional comments/questions. I've
> >>>> organized these by section to help provide context, and I've numbered
> >>>> the
> >>>> questions to make it easier to reply/discuss. I apologize for the
> >>>> length of
> >>>> this email -- I thought it better to do this all at once rather than
> >>>> string
> >>>> out in several emails.
> >>>>
> >>>> Motivation
> >>>> 1. This section does not mention another important source of duplicate
> >>>> records: producer retries. Using the idempotent producer can prevent
> >>>> duplicates due to retries, but it can't help with anything else.
> Perhaps
> >>>> it's worth mentioning?
> >>>>
> >>>> Public Interfaces
> >>>> 2. Have we considered `exactly.once.source.enabled=true` by default in
> >>>> the
> >>>> worker config? We're currently on track for considering this KIP for
> >>>> inclusion in AK 3.0, so we are allowed backward incompatible changes.
> >>>> 3. It is possible (but maybe unlikely) that existing source connectors
> >>>> might already have a connector config property that matches the
> proposed
> >>>> new connector config `offsets.storage.topic`. It's good that this
> >>>> matches
> >>>> the worker's config, but were any other property names considered?
> >>>> 4. I see that one of the rejected alternatives was a connector-level
> >>>> option
> >>>> to disable EOS. For example, what happens if some source connectors
> >>>> don't
> >>>> work well with EOS (e.g., the transaction blocking behavior of a
> >>>> multi-task
> >>>> source connector is unacceptable)? What would we say if a user wants
> to
> >>>> use
> >>>> EOS for some source connectors but not others? Is it really viable to
> >>>> not
> >>>> allow that?
> >>>> 5. The worker source task will create a consumer for the offset topic
> >>>> used
> >>>> by the connector. Currently, Connect does not use or do anything with
> >>>> `consumer.override.*` properties, but won't they now likely be needed
> >>>> with
> >>>> a source connector using EOS? What happens with existing source
> >>>> connector
> >>>> configurations that don't define these?
> >>>>
> >>>> Proposed Changes
> >>>> Offset reads
> >>>> 6. The KIP says "a message will be logged notifying them of this
> fact."
> >>>> Should it be more specific, namely "a warning message..."?
> >>>>
> >>>> SourceTask record commit API
> >>>> 7. The second paragraph talks about why we cannot guarantee the
> methods
> >>>> be
> >>>> invoked after every successful commit, and I think that makes sense
> >>>> because
> >>>> there is no other practical way to handle worker failures. It does
> seem
> >>>> like it would be more useful to describe the expectation about calling
> >>>> these methods during normal task shutdown. Specifically, should
> >>>> connector
> >>>> developers expect that these methods will be called following a
> >>>> successful
> >>>> commit immediately prior to stopping the task (barring worker
> failures)?
> >>>>
> >>>> Per-connector offsets topic
> >>>> This section says:
> >>>>
> >>>> > Finally, it allows users to limit the effect that hanging
> >>>> transactions on
> >>>> > an offsets topic will have. If tasks A and B use the same offsets
> >>>> topic,
> >>>> > and task A initiates a transaction on that offsets topic right
> before
> >>>> task
> >>>> > B starts up, then task A dies suddenly without committing its
> >>>> transaction,
> >>>> > task B will have to wait for that transaction to time out before it
> >>>> can
> >>>> > read to the end of the offsets topic. If the transaction timeout is
> >>>> set
> >>>> > very high for task A (to accommodate bursts of high throughput, for
> >>>> > example), this will block task B from processing any data for a long
> >>>> time.
> >>>> > Although this scenario may be unavoidable in some cases, using a
> >>>> dedicated
> >>>> > offsets topic for each connector should allow cluster administrators
> >>>> to
> >>>> > isolate the blast radius of a hanging transaction on an offsets
> >>>> topic. This
> >>>> > way, although tasks of the same connector may still interfere with
> >>>> each
> >>>> > other, they will at least not interfere with tasks of other
> >>>> connectors.
> >>>> > This should be sufficient for most multitenant environments.
> >>>>
> >>>>
> >>>> 8. Won't there also be transaction contention just within a single
> >>>> connector using multiple tasks? Even if that connector has all offsets
> >>>> go
> >>>> to a dedicated topic (separate from all other connectors), then isn't
> it
> >>>> still possible/likely that a transaction from task 1 blocks a
> >>>> transaction
> >>>> from task 2? How will this affect latency perceived by consumers
> (e.g.,
> >>>> the
> >>>> records from task 2's tranasction don't become visible until the
> >>>> transactions from task 1 and task 2 commit)?
> >>>>
> >>>> Migration
> >>>> 9. This section says:
> >>>>
> >>>> > the worker will read to the end of the offsets topic before starting
> >>>> the
> >>>> > task. If there are no offsets for the connector present in the topic
> >>>> and
> >>>> > there is no sentinel value...
> >>>>
> >>>> This sounds like the worker will know that a sentinel value is
> expected,
> >>>> but IIUC a worker cannot know that. Instead, the worker consuming the
> >>>> topic
> >>>> must read to the end and expect _either_ a sentinel value _or_
> committed
> >>>> offsets. Can the text make this more clear?
> >>>>
> >>>> Task count records
> >>>> 10. This section says "then workers will not bring up tasks for the
> >>>> connector." What does this mean? Does it mean the tasks fail? How do
> >>>> users
> >>>> recover from this? I think the "Addressed failure/degradation
> scenarios"
> >>>> section tries to address some of these, but any link is implicit.
> >>>>
> >>>> Source task startup
> >>>> 11. This section says "Otherwise, the worker will emit a warning log
> >>>> message and refuse to bring the task up until a rebalance occurs."
> What
> >>>> exactly does "refuse to bring the task up" mean? Also, "until a
> >>>> rebalance
> >>>> occurs" suggests that another rebalance will be forthcoming? What
> would
> >>>> cause that subsequent rebalance? Isn't it possible that there is no
> >>>> subsequent rebalance for a potentially significant amount of time?
> >>>> Again,
> >>>> any link between this section and the "Addressed failure/degradation
> >>>> scenarios" section is implicit.
> >>>>
> >>>> 12. Also, "Once a worker has instantiated a producer for a source
> task,
> >>>> it
> >>>> will read to the end of the config topic once more, and if a new set
> of
> >>>> task configurations for that connector has been generated, it will
> abort
> >>>> startup of the task." What exactly does "abort startup of the task"
> >>>> mean?
> >>>> How do users recover from this? Again, I more explicit behavior would
> be
> >>>> helpful.
> >>>>
> >>>> Addressed failure/degradation scenarios
> >>>> Cannot fence out producer during rebalance
> >>>> 13. This section says:
> >>>>
> >>>> > The leader may be unable to fence out producers during a rebalance
> >>>> if, for
> >>>> > example, it is unable to reach or locate a transaction coordinator.
> >>>>
> >>>> Isn't it possible given N producer transactional IDs for there to be
> up
> >>>> to
> >>>> N transaction coordinators? Does this reduce the likelihood that it
> can
> >>>> be
> >>>> completed successfully? What risk does this play in rebalances being
> >>>> unsuccessful?
> >>>>
> >>>> Rolling upgrade
> >>>> 14. This section says:
> >>>>
> >>>> > Users can stop each worker, upgrade to a later version if necessary,
> >>>> set
> >>>> > exactly.once.source.enabled to true in the config, then restart, one
> >>>> by one.
> >>>>
> >>>> Nit: This is not as clear/precise as it probably needs to be. A subtle
> >>>> change makes this more precise:
> >>>>
> >>>> > Users can then do the following for each worker, one by one: stop
> the
> >>>> > worker, upgrade to a later version if necessary, set
> >>>> > exactly.once.source.enabled to true in the config, then restart.
> >>>>
> >>>>
> >>>> 15. A bit later in the same section:
> >>>>
> >>>> > This will have the effect that, until the leader is upgraded, no
> >>>> upgraded
> >>>> > workers will start source tasks.
> >>>>
> >>>> IIUC, the net effect is that rolling upgrades don't really work for
> >>>> source
> >>>> connectors. Wouldn't it just be easier and maybe faster to stop and
> >>>> restart
> >>>> the whole cluster? Also, what are the states of the source tasks
> during
> >>>> this time?
> >>>>
> >>>> 16. And a bit later:
> >>>>
> >>>> > Some of these offsets may be out-of-date or not match the ones in
> the
> >>>> > connector’s separate offsets topic
> >>>>
> >>>> The "not match" is probably looser than necessary. What about "older
> >>>> than"?
> >>>>
> >>>>
> >>>> Future Work
> >>>> Standalone mode support
> >>>> 17. The fact that EOS only works for distributed Connect was mentioned
> >>>> only
> >>>> once earlier in the document, but I missed it on my first pass and
> was a
> >>>> bit surprised when I read this. Perhaps we should be clearer on this
> >>>> earlier on in the document, perhaps by adding a "Limitations" section.
> >>>> There are probably several limitations:
> >>>>
> >>>>    - Only Connect's distributed worker supports EOS.
> >>>>    - EOS can be enabled only for all source connectors, and not only
> for
> >>>>    some.
> >>>>    - EOS works only for source connectors that assign source
> partitions
> >>>> to
> >>>>    at most one task at a time.
> >>>>
> >>>> I suspect this last limitation is likely to not be that severe, since
> >>>> such
> >>>> a connector would have non-deterministic restart behavior with the
> >>>> current
> >>>> Connect runtime, as each task would commit different source offsets
> >>>> using
> >>>> the same source partitions. Of course, a following up question
> >>>> then becomes: What is the impact of any source connector that doesn't
> >>>> adhere to this last limitation? (This is kind of addressed in the KIP,
> >>>> but
> >>>> may need to be updated/moved if we add the Limitations section.
> >>>>
> >>>> Best regards,
> >>>>
> >>>> Randall
> >>>>
> >>>>
> >>>> On Thu, Mar 18, 2021 at 5:26 PM Randall Hauch <rha...@apache.org>
> >>>> wrote:
> >>>>
> >>>> > Chris,
> >>>> >
> >>>> > Thanks for the well-written KIP and the more recent updates. It's
> >>>> exciting
> >>>> > to envision EOS for Connect source connectors!
> >>>> >
> >>>> > I'll follow up this email with another that goes into specific
> >>>> comments
> >>>> > and questions not already touched on by others. This email addresses
> >>>> > comments and questions previously discussed on this thread.
> >>>> >
> >>>> > I'd like to +1 your response to Gwen's question about
> >>>> connector-specific
> >>>> > offsets topic (#2 in her email). But importantly, a connector can
> >>>> override
> >>>> > any producer settings, including Kafka credentials that don't have
> >>>> > permissions on the internal Connect offsets topic. For this reason,
> I
> >>>> think
> >>>> > it's necessary for this KIP to address those scenarios.
> >>>> >
> >>>> > Gwen's #4 question was about how transactions related to the batches
> >>>> > source connector tasks return from their `poll()` method, and you
> >>>> followed
> >>>> > up by clarifying the document to make this more clear. IIUC, the KIP
> >>>> > suggests still using the existing `offset.flush.interval.ms`
> >>>> property and
> >>>> > then committing transactions based (solely?) upon the value of the
> >>>> > property. This would seem to make the transactions based upon time
> >>>> rather
> >>>> > than record count. Is that really the intent? Did you consider using
> >>>> one
> >>>> > transaction per "batch" of records, whatever number of records that
> >>>> means?
> >>>> > One issue with the time-based boundary is that `
> >>>> offset.flush.interval.ms`
> >>>> > defaults to 60 seconds, and that could mean many hundreds of
> >>>> thousands of
> >>>> > records on a high throughput task. (BTW, I do agree with the KIP's
> >>>> > rejected alternative to not expose to the task or let the task
> define
> >>>> the
> >>>> > transaction boundaries.)
> >>>> >
> >>>> > Gwen's #5 question was about rebalances of a source connector. Your
> >>>> answer
> >>>> > made sense, but while the KIP talks about source tasks being stopped
> >>>> before
> >>>> > the connector is rebalanced, I'm wondering about the delay or other
> >>>> impact
> >>>> > this has on the rebalance itself. The "Cannot fence out producers
> >>>> during
> >>>> > rebalance" section talks about fencing out producers, but IIUC this
> >>>> is not
> >>>> > the same as:
> >>>> >
> >>>> > ... all workers will preemptively stop all tasks of all source
> >>>> connectors
> >>>> > for which task configurations.
> >>>> >
> >>>> >
> >>>> > If this is already addressed in the KIP, please just point me to
> that
> >>>> > section.
> >>>> >
> >>>> > Finally, regarding Guozhang's question about "partial rebalances"
> >>>> rather
> >>>> > than fencing out all producers. In your initial response, you said
> the
> >>>> > following:
> >>>> >
> >>>> > Connectors can arbitrarily reassign source partitions (such as
> >>>> > database tables, or Kafka topic partitions) across tasks, so even if
> >>>> the
> >>>> > assignment of tasks across workers remains unchanged, the assignment
> >>>> of
> >>>> > source partitions across those workers might.
> >>>> >
> >>>> >
> >>>> > This is a very important point. In fact, there are definitely
> >>>> connectors
> >>>> > that shuffle around "source partition" assignments to different
> >>>> tasks, and
> >>>> > IMO we have to take that into consideration because Connect provides
> >>>> no
> >>>> > guidance on this and because there are good reasons to do this. So
> >>>> I'm glad
> >>>> > the KIP takes this into account. This is actually one of the issues
> I
> >>>> ran
> >>>> > into during an earlier attempt to POC EOS for source connectors, and
> >>>> the
> >>>> > ability to force fencing is a novel (albeit heavy-handed) way to
> deal
> >>>> with
> >>>> > that.
> >>>> >
> >>>> > As I said, I'll follow up in subsequent emails on other specific
> >>>> questions
> >>>> > I have.
> >>>> >
> >>>> > Best regards,
> >>>> >
> >>>> > Randall
> >>>> >
> >>>> > On Mon, Feb 22, 2021 at 6:45 PM Guozhang Wang <wangg...@gmail.com>
> >>>> wrote:
> >>>> >
> >>>> >> This is a great explanation, thank you!
> >>>> >>
> >>>> >> On Mon, Feb 22, 2021 at 2:44 PM Chris Egerton <chr...@confluent.io
> >
> >>>> >> wrote:
> >>>> >>
> >>>> >> > Hi Guozhang,
> >>>> >> >
> >>>> >> > Your understanding should be correct in most cases, but there are
> >>>> two
> >>>> >> finer
> >>>> >> > points that may interest you:
> >>>> >> >
> >>>> >> > 1. It's technically dependent on the implementation of the
> >>>> connector and
> >>>> >> > how it chooses to allocate source partitions across its tasks;
> >>>> even if
> >>>> >> the
> >>>> >> > number of tasks and source partitions remains completely
> >>>> unchanged, a
> >>>> >> > connector may still choose to shuffle around the partition->task
> >>>> >> > allocation. I can't think of any cases where this might happen
> off
> >>>> the
> >>>> >> top
> >>>> >> > of my head, but it seemed worth sharing given the educational
> >>>> nature of
> >>>> >> the
> >>>> >> > question.
> >>>> >> > 2. It's also possible that the number of source partitions
> remains
> >>>> >> > unchanged, but the set of source partitions changes. One case
> >>>> where this
> >>>> >> > might happen is with a database connector monitoring for tables
> >>>> that
> >>>> >> match
> >>>> >> > a given regex every five minutes; if a table that matched that
> >>>> regex
> >>>> >> during
> >>>> >> > the last scan got assigned to a task and then dropped, and then
> >>>> another
> >>>> >> > table that matched the regex got added before the next scan, the
> >>>> >> connector
> >>>> >> > would see the same number of tables, but the actual set would be
> >>>> >> different.
> >>>> >> > At this point, it would again be connector-dependent for whether
> >>>> the
> >>>> >> > already-assigned tables stayed assigned to the same tasks. Is
> >>>> anyone
> >>>> >> else
> >>>> >> > reminded of the various consumer partition assignment strategies
> >>>> at this
> >>>> >> > point?
> >>>> >> >
> >>>> >> > A general comment I should make here (not necessarily for your
> >>>> benefit
> >>>> >> but
> >>>> >> > for anyone following along) is that it's important to keep in
> mind
> >>>> that
> >>>> >> > "source partitions" in Kafka Connect aren't Kafka topic
> partitions
> >>>> >> (well,
> >>>> >> > unless your connector is designed to replicate data across Kafka
> >>>> >> clusters
> >>>> >> > like MirrorMaker 2). As a result, we have to rely on
> >>>> developer-written
> >>>> >> code
> >>>> >> > to a greater extent to define what the source partitions for a
> >>>> source
> >>>> >> > connector are, and how to divvy up that work amongst tasks.
> >>>> >> >
> >>>> >> > Hope this helps! If you have any further questions please hit me
> >>>> with
> >>>> >> them;
> >>>> >> > I doubt you'll be the only one wondering about these things.
> >>>> >> >
> >>>> >> > Cheers,
> >>>> >> >
> >>>> >> > Chris
> >>>> >> >
> >>>> >> > On Mon, Feb 22, 2021 at 5:30 PM Guozhang Wang <
> wangg...@gmail.com>
> >>>> >> wrote:
> >>>> >> >
> >>>> >> > > Thanks Chris, yeah I think I agree with you that this does not
> >>>> >> > necessarily
> >>>> >> > > have to be in the scope of this KIP.
> >>>> >> > >
> >>>> >> > > My understanding was that the source partitions -> tasks are
> not
> >>>> >> static
> >>>> >> > but
> >>>> >> > > dynamic, but they are only changed when either the number of
> >>>> >> partitions
> >>>> >> > > changed or "tasks.max" config changed (please correct me if I'm
> >>>> >> wrong),
> >>>> >> > so
> >>>> >> > > what I'm thinking that we can try to detect if either of these
> >>>> things
> >>>> >> > > happens, and if they do not happen we can assume the mapping
> from
> >>>> >> > > partitions -> tasks does not change --- of course this requires
> >>>> some
> >>>> >> > > extension on the API, aligned with what you said. I would like
> >>>> to make
> >>>> >> > sure
> >>>> >> > > that my understanding here is correct :)
> >>>> >> > >
> >>>> >> > > Guozhang
> >>>> >> > >
> >>>> >> > >
> >>>> >> > > On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton <
> >>>> chr...@confluent.io>
> >>>> >> > > wrote:
> >>>> >> > >
> >>>> >> > > > Hi Guozhang,
> >>>> >> > > >
> >>>> >> > > > Thanks for taking a look, and for your suggestion!
> >>>> >> > > >
> >>>> >> > > > I think there is room for more intelligent fencing
> strategies,
> >>>> but I
> >>>> >> > > think
> >>>> >> > > > that it'd have to be more nuanced than one based on
> >>>> task->worker
> >>>> >> > > > assignments. Connectors can arbitrarily reassign source
> >>>> partitions
> >>>> >> > (such
> >>>> >> > > as
> >>>> >> > > > database tables, or Kafka topic partitions) across tasks, so
> >>>> even if
> >>>> >> > the
> >>>> >> > > > assignment of tasks across workers remains unchanged, the
> >>>> >> assignment of
> >>>> >> > > > source partitions across those workers might. Connect doesn't
> >>>> do any
> >>>> >> > > > inspection of task configurations at the moment, and without
> >>>> >> expansions
> >>>> >> > > to
> >>>> >> > > > the Connector/Task API, it'd likely be impossible to get
> >>>> information
> >>>> >> > from
> >>>> >> > > > tasks about their source partition assignments. With that in
> >>>> mind, I
> >>>> >> > > think
> >>>> >> > > > we may want to leave the door open for more intelligent task
> >>>> fencing
> >>>> >> > but
> >>>> >> > > > not include that level of optimization at this stage. Does
> that
> >>>> >> sound
> >>>> >> > > fair
> >>>> >> > > > to you?
> >>>> >> > > >
> >>>> >> > > > There is one case that I've identified where we can cheaply
> >>>> optimize
> >>>> >> > > right
> >>>> >> > > > now: single-task connectors, such as the Debezium CDC source
> >>>> >> > connectors.
> >>>> >> > > If
> >>>> >> > > > a connector is configured at some point with a single task,
> >>>> then
> >>>> >> some
> >>>> >> > > other
> >>>> >> > > > part of its configuration is altered but the single-task
> aspect
> >>>> >> > remains,
> >>>> >> > > > the leader doesn't have to worry about fencing out the older
> >>>> task as
> >>>> >> > the
> >>>> >> > > > new task's producer will do that automatically. In this case,
> >>>> the
> >>>> >> > leader
> >>>> >> > > > can skip the producer fencing round and just write the new
> task
> >>>> >> count
> >>>> >> > > > record straight to the config topic. I've added this case to
> >>>> the
> >>>> >> KIP;
> >>>> >> > if
> >>>> >> > > it
> >>>> >> > > > overcomplicates things I'm happy to remove it, but seeing as
> it
> >>>> >> serves
> >>>> >> > a
> >>>> >> > > > real use case and comes fairly cheap, I figured it'd be best
> to
> >>>> >> include
> >>>> >> > > > now.
> >>>> >> > > >
> >>>> >> > > > Thanks again for your feedback; if you have other thoughts
> I'd
> >>>> love
> >>>> >> to
> >>>> >> > > hear
> >>>> >> > > > them!
> >>>> >> > > >
> >>>> >> > > > Cheers,
> >>>> >> > > >
> >>>> >> > > > Chris
> >>>> >> > > >
> >>>> >> > > > On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton <
> >>>> chr...@confluent.io>
> >>>> >> > > wrote:
> >>>> >> > > >
> >>>> >> > > > > Hi Gwen,
> >>>> >> > > > >
> >>>> >> > > > > Thanks for the feedback!
> >>>> >> > > > >
> >>>> >> > > > > 0.
> >>>> >> > > > > That's a great point; I've updated the motivation section
> >>>> with
> >>>> >> that
> >>>> >> > > > > rationale.
> >>>> >> > > > >
> >>>> >> > > > > 1.
> >>>> >> > > > > This enables safe "hard downgrades" of clusters where,
> >>>> instead of
> >>>> >> > just
> >>>> >> > > > > disabling exactly-once support on each worker, each worker
> is
> >>>> >> rolled
> >>>> >> > > back
> >>>> >> > > > > to an earlier version of the Connect framework that doesn't
> >>>> >> support
> >>>> >> > > > > per-connector offsets topics altogether. Those workers
> would
> >>>> go
> >>>> >> back
> >>>> >> > to
> >>>> >> > > > all
> >>>> >> > > > > using a global offsets topic, and any offsets stored in
> >>>> >> per-connector
> >>>> >> > > > > topics would be lost to those workers. This would cause a
> >>>> large
> >>>> >> > number
> >>>> >> > > of
> >>>> >> > > > > duplicates to flood the downstream system. While
> technically
> >>>> >> > > permissible
> >>>> >> > > > > given that the user in this case will have knowingly
> >>>> switched to a
> >>>> >> > > > version
> >>>> >> > > > > of the Connect framework that doesn't support exactly-once
> >>>> source
> >>>> >> > > > > connectors (and is therefore susceptible to duplicate
> >>>> delivery of
> >>>> >> > > > records),
> >>>> >> > > > > the user experience in this case could be pretty bad. A
> >>>> similar
> >>>> >> > > situation
> >>>> >> > > > > is if users switch back from per-connector offsets topics
> to
> >>>> the
> >>>> >> > global
> >>>> >> > > > > offsets topic.
> >>>> >> > > > > I've tried to make this more clear in the KIP by linking to
> >>>> the
> >>>> >> "Hard
> >>>> >> > > > > downgrade" section from the proposed design, and by
> >>>> expanding on
> >>>> >> the
> >>>> >> > > > > rationale provided for redundant global offset writes in
> the
> >>>> "Hard
> >>>> >> > > > > downgrade" section. Let me know if you think this could be
> >>>> >> improved
> >>>> >> > or
> >>>> >> > > > > think a different approach is warranted.
> >>>> >> > > > >
> >>>> >> > > > > 2.
> >>>> >> > > > > I think the biggest difference between Connect and Streams
> >>>> comes
> >>>> >> from
> >>>> >> > > the
> >>>> >> > > > > fact that Connect allows users to create connectors that
> >>>> target
> >>>> >> > > different
> >>>> >> > > > > Kafka clusters on the same worker. This hasn't been a
> >>>> problem in
> >>>> >> the
> >>>> >> > > past
> >>>> >> > > > > because workers use two separate producers to write offset
> >>>> data
> >>>> >> and
> >>>> >> > > > source
> >>>> >> > > > > connector records, but in order to write offsets and
> records
> >>>> in
> >>>> >> the
> >>>> >> > > same
> >>>> >> > > > > transaction, it becomes necessary to use a single producer,
> >>>> which
> >>>> >> > also
> >>>> >> > > > > requires that the internal offsets topic be hosted on the
> >>>> same
> >>>> >> Kafka
> >>>> >> > > > > cluster that the connector is targeting.
> >>>> >> > > > > This may sound like a niche use case but it was actually
> one
> >>>> of
> >>>> >> the
> >>>> >> > > > > driving factors behind KIP-458 (
> >>>> >> > > > >
> >>>> >> > > >
> >>>> >> > >
> >>>> >> >
> >>>> >>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy
> >>>> >> > > > ),
> >>>> >> > > > > and it's a feature that we rely on heavily today.
> >>>> >> > > > > If there's too much going on in this KIP and we'd prefer to
> >>>> drop
> >>>> >> > > support
> >>>> >> > > > > for running that type of setup with exactly-once source
> >>>> connectors
> >>>> >> > for
> >>>> >> > > > now,
> >>>> >> > > > > I can propose this in a separate KIP. I figured it'd be
> best
> >>>> to
> >>>> >> get
> >>>> >> > > this
> >>>> >> > > > > out of the way with the initial introduction of
> exactly-once
> >>>> >> source
> >>>> >> > > > support
> >>>> >> > > > > in order to make adoption by existing Connect users as
> >>>> seamless as
> >>>> >> > > > > possible, and since we'd likely have to address this issue
> >>>> before
> >>>> >> > being
> >>>> >> > > > > able to utilize the feature ourselves.
> >>>> >> > > > > I switched around the ordering of the "motivation" section
> >>>> for
> >>>> >> > > > > per-connector offsets topics to put the biggest factor
> >>>> first, and
> >>>> >> > > called
> >>>> >> > > > it
> >>>> >> > > > > out as the major difference between Connect and Streams in
> >>>> this
> >>>> >> case.
> >>>> >> > > > >
> >>>> >> > > > > 3.
> >>>> >> > > > > Fair enough, after giving it a little more thought I agree
> >>>> that
> >>>> >> > > allowing
> >>>> >> > > > > users to shoot themselves in the foot is a bad idea here.
> >>>> There's
> >>>> >> > also
> >>>> >> > > > some
> >>>> >> > > > > precedent for handling this with the "enable.idempotence"
> >>>> and "
> >>>> >> > > > > transactional.id" producer properties; if you specify a
> >>>> >> > transactional
> >>>> >> > > ID
> >>>> >> > > > > but don't specify a value for idempotence, the producer
> just
> >>>> does
> >>>> >> the
> >>>> >> > > > right
> >>>> >> > > > > thing for you by enabling idempotence and emitting a log
> >>>> message
> >>>> >> > > letting
> >>>> >> > > > > you know that it's done so. I've adjusted the proposed
> >>>> behavior to
> >>>> >> > try
> >>>> >> > > to
> >>>> >> > > > > use a similar approach; let me know what you think.
> >>>> >> > > > > There is the potential gap here where, sometime in the
> >>>> future, a
> >>>> >> > third
> >>>> >> > > > > accepted value for the "isolation.level" property is added
> >>>> to the
> >>>> >> > > > consumer
> >>>> >> > > > > API and users will be unable to use that new value for
> their
> >>>> >> worker.
> >>>> >> > > But
> >>>> >> > > > > the likelihood of footgunning seems much greater than this
> >>>> >> scenario,
> >>>> >> > > and
> >>>> >> > > > we
> >>>> >> > > > > can always address expansions to the consumer API with
> >>>> changes to
> >>>> >> the
> >>>> >> > > > > Connect framework as well if/when that becomes necessary.
> >>>> >> > > > > I've also added a similar note to the source task's
> >>>> transactional
> >>>> >> ID
> >>>> >> > > > > property; user overrides of it will also be disabled.
> >>>> >> > > > >
> >>>> >> > > > > 4.
> >>>> >> > > > > Yeah, that's mostly correct. I tried to touch on this in
> the
> >>>> >> > > "Motivation"
> >>>> >> > > > > section with this bit:
> >>>> >> > > > >     > The Connect framework periodically writes source task
> >>>> >> offsets
> >>>> >> > to
> >>>> >> > > an
> >>>> >> > > > > internal Kafka topic at a configurable interval, once the
> >>>> source
> >>>> >> > > records
> >>>> >> > > > > that they correspond to have been successfully sent to
> Kafka.
> >>>> >> > > > > I've expanded on this in the "Offset (and record) writes"
> >>>> section,
> >>>> >> > and
> >>>> >> > > > > I've tweaked the "Motivation" section a little bit to add a
> >>>> link
> >>>> >> to
> >>>> >> > the
> >>>> >> > > > > relevant config property and to make the language a little
> >>>> more
> >>>> >> > > accurate.
> >>>> >> > > > >
> >>>> >> > > > > 5.
> >>>> >> > > > > This isn't quite as bad as stop-the-world; more like
> >>>> >> > > stop-the-connector.
> >>>> >> > > > > If a worker is running a dozen connectors and one of those
> >>>> (that
> >>>> >> > > happens
> >>>> >> > > > to
> >>>> >> > > > > be a source) is reconfigured, only the tasks for that
> >>>> connector
> >>>> >> will
> >>>> >> > be
> >>>> >> > > > > preemptively halted, and all other tasks and connectors
> will
> >>>> >> continue
> >>>> >> > > > > running. This is pretty close to the current behavior with
> >>>> >> > incremental
> >>>> >> > > > > rebalancing; the only difference is that, instead of
> waiting
> >>>> for
> >>>> >> the
> >>>> >> > > > > rebalance to complete before halting the tasks for that
> >>>> connector,
> >>>> >> > the
> >>>> >> > > > > worker will halt them in preparation for the rebalance.
> This
> >>>> >> > increases
> >>>> >> > > > the
> >>>> >> > > > > time that the tasks will be down for, but is necessary if
> we
> >>>> want
> >>>> >> to
> >>>> >> > > give
> >>>> >> > > > > tasks time to gracefully shut down before being fenced out
> >>>> by the
> >>>> >> > > leader,
> >>>> >> > > > > and shouldn't impact availability too much as it's only
> >>>> triggered
> >>>> >> on
> >>>> >> > > > > connector reconfiguration and rebalances generally don't
> >>>> take that
> >>>> >> > long
> >>>> >> > > > > with the new incremental protocol.
> >>>> >> > > > >
> >>>> >> > > > > 6.
> >>>> >> > > > > Ah, thanks for the catch! That's a good point. I've
> adjusted
> >>>> the
> >>>> >> > > proposal
> >>>> >> > > > > to treat the worker's transactional ID like the consumer's
> >>>> >> isolation
> >>>> >> > > > level
> >>>> >> > > > > and the source task's transactional ID properties; can't be
> >>>> >> modified
> >>>> >> > by
> >>>> >> > > > > users and, if an attempt is made to do so, will be
> discarded
> >>>> after
> >>>> >> > > > logging
> >>>> >> > > > > a message to that effect. More footguns removed, hooray!
> >>>> >> > > > >
> >>>> >> > > > > Thanks again for taking a look; if you have any other
> >>>> questions or
> >>>> >> > > > > suggestions I'm all ears.
> >>>> >> > > > >
> >>>> >> > > > > Cheers,
> >>>> >> > > > >
> >>>> >> > > > > Chris
> >>>> >> > > > >
> >>>> >> > > > > p.s. - Guozhang--I saw your email; response to you coming
> >>>> next :)
> >>>> >> > > > >
> >>>> >> > > > > On Sun, Feb 21, 2021 at 4:12 PM Guozhang Wang <
> >>>> wangg...@gmail.com
> >>>> >> >
> >>>> >> > > > wrote:
> >>>> >> > > > >
> >>>> >> > > > >> Hello Chris,
> >>>> >> > > > >>
> >>>> >> > > > >> Thanks for the great write-up! I was mainly reviewing the
> >>>> admin
> >>>> >> > > > >> fenceProducers API of the KIP. I think it makes sense
> >>>> overall.
> >>>> >> I'm
> >>>> >> > > just
> >>>> >> > > > >> wondering if we can go one step further, that instead of
> >>>> forcing
> >>>> >> to
> >>>> >> > > > fence
> >>>> >> > > > >> out all producers of the previous generation, could we try
> >>>> to
> >>>> >> > achieve
> >>>> >> > > > >> "partial rebalance" still by first generate the new
> >>>> assignment,
> >>>> >> and
> >>>> >> > > then
> >>>> >> > > > >> based on the new assignment only fence out producers
> >>>> involved in
> >>>> >> > tasks
> >>>> >> > > > >> that
> >>>> >> > > > >> are indeed migrated? Just a wild thought to bring up for
> >>>> debate.
> >>>> >> > > > >>
> >>>> >> > > > >> Guozhang
> >>>> >> > > > >>
> >>>> >> > > > >>
> >>>> >> > > > >>
> >>>> >> > > > >> On Sat, Feb 20, 2021 at 10:20 PM Gwen Shapira <
> >>>> g...@confluent.io
> >>>> >> >
> >>>> >> > > > wrote:
> >>>> >> > > > >>
> >>>> >> > > > >> > Hey Chris,
> >>>> >> > > > >> >
> >>>> >> > > > >> > Thank you for the proposal. Few questions / comments:
> >>>> >> > > > >> >
> >>>> >> > > > >> > 0. It may be worth pointing out in the motivation
> section
> >>>> that
> >>>> >> > > > >> > source-connector exactly once is more important than
> sink
> >>>> >> > connector
> >>>> >> > > > >> > exactly once, since many target systems will have unique
> >>>> key
> >>>> >> > > > >> > constraint mechanisms that will prevent duplicates.
> Kafka
> >>>> does
> >>>> >> not
> >>>> >> > > > >> > have any such constraints, so without this KIP-618,
> >>>> exactly
> >>>> >> once
> >>>> >> > > won't
> >>>> >> > > > >> > be possible.
> >>>> >> > > > >> > 1. I am not clear why we need the worker to async copy
> >>>> offsets
> >>>> >> > from
> >>>> >> > > > >> > the connector-specific offset topic to a global offsets
> >>>> topic
> >>>> >> > > > >> > 2. While the reasoning you have for offset topic per
> >>>> connector
> >>>> >> > > appears
> >>>> >> > > > >> > sound, it doesn't add up with the use of transactions in
> >>>> >> > > KafkaStreams.
> >>>> >> > > > >> > My understanding is that KafkaStreams uses shared
> offsets
> >>>> topic
> >>>> >> > with
> >>>> >> > > > >> > all the other consumers, and (apparently) corrupting
> data
> >>>> and
> >>>> >> > delays
> >>>> >> > > > >> > by other tenants is a non-issue. Perhaps you can comment
> >>>> on how
> >>>> >> > > > >> > Connect is different? In general much of the complexity
> >>>> in the
> >>>> >> KIP
> >>>> >> > > is
> >>>> >> > > > >> > related to the separate offset topic, and I'm wondering
> >>>> if this
> >>>> >> > can
> >>>> >> > > be
> >>>> >> > > > >> > avoided. The migration use-case is interesting, but not
> >>>> >> related to
> >>>> >> > > > >> > exactly-once and can be handled separately.
> >>>> >> > > > >> > 3. Allowing users to override the isolation level for
> the
> >>>> >> offset
> >>>> >> > > > >> > reader, even when exactly-once is enabled, thereby
> >>>> disabling
> >>>> >> > > > >> > exactly-once in a non-obvious way. I get that connect
> >>>> usually
> >>>> >> > allows
> >>>> >> > > > >> > users to shoot themselves in the foot, but are there any
> >>>> actual
> >>>> >> > > > >> > benefits for allowing it in this case? Maybe it is
> better
> >>>> if we
> >>>> >> > > don't?
> >>>> >> > > > >> > I don't find the argument that we always did this to be
> >>>> >> > particularly
> >>>> >> > > > >> > compelling.
> >>>> >> > > > >> > 4. It isn't stated explicitly, but it sounds like
> connect
> >>>> or
> >>>> >> > source
> >>>> >> > > > >> > connectors already have some batching mechanism, and
> that
> >>>> >> > > transaction
> >>>> >> > > > >> > boundaries will match the batches (i.e. each batch will
> >>>> be a
> >>>> >> > > > >> > transaction?). If so, worth being explicit.
> >>>> >> > > > >> > 5. "When a rebalance is triggered, before (re-)joining
> the
> >>>> >> cluster
> >>>> >> > > > >> > group, all workers will preemptively stop all tasks of
> all
> >>>> >> source
> >>>> >> > > > >> > connectors for which task configurations are present in
> >>>> the
> >>>> >> config
> >>>> >> > > > >> > topic after the latest task count record" - how will
> this
> >>>> play
> >>>> >> > with
> >>>> >> > > > >> > the incremental rebalances? isn't this exactly the
> >>>> >> stop-the-world
> >>>> >> > > > >> > rebalance we want to avoid?
> >>>> >> > > > >> > 6. "the worker will instantiate a transactional producer
> >>>> whose
> >>>> >> > > > >> > transactional ID is, by default, the group ID of the
> >>>> cluster
> >>>> >> (but
> >>>> >> > > may
> >>>> >> > > > >> > be overwritten by users using the transactional.id
> worker
> >>>> >> > > property)"
> >>>> >> > > > -
> >>>> >> > > > >> > If users change transactional.id property, zombie
> leaders
> >>>> >> won't
> >>>> >> > get
> >>>> >> > > > >> > fenced (since they will have an older and different
> >>>> >> transactional
> >>>> >> > > id)
> >>>> >> > > > >> >
> >>>> >> > > > >> > Thanks,
> >>>> >> > > > >> >
> >>>> >> > > > >> > Gwen
> >>>> >> > > > >> >
> >>>> >> > > > >> > On Thu, May 21, 2020 at 11:21 PM Chris Egerton <
> >>>> >> > chr...@confluent.io
> >>>> >> > > >
> >>>> >> > > > >> > wrote:
> >>>> >> > > > >> > >
> >>>> >> > > > >> > > Hi all,
> >>>> >> > > > >> > >
> >>>> >> > > > >> > > I know it's a busy time with the upcoming 2.6 release
> >>>> and I
> >>>> >> > don't
> >>>> >> > > > >> expect
> >>>> >> > > > >> > > this to get a lot of traction until that's done, but
> >>>> I've
> >>>> >> > > published
> >>>> >> > > > a
> >>>> >> > > > >> KIP
> >>>> >> > > > >> > > for allowing atomic commit of offsets and records for
> >>>> source
> >>>> >> > > > >> connectors
> >>>> >> > > > >> > and
> >>>> >> > > > >> > > would appreciate your feedback:
> >>>> >> > > > >> > >
> >>>> >> > > > >> >
> >>>> >> > > > >>
> >>>> >> > > >
> >>>> >> > >
> >>>> >> >
> >>>> >>
> >>>>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> >>>> >> > > > >> > >
> >>>> >> > > > >> > > This feature should make it possible to implement
> source
> >>>> >> > > connectors
> >>>> >> > > > >> with
> >>>> >> > > > >> > > exactly-once delivery guarantees, and even allow a
> wide
> >>>> >> range of
> >>>> >> > > > >> existing
> >>>> >> > > > >> > > source connectors to provide exactly-once delivery
> >>>> guarantees
> >>>> >> > with
> >>>> >> > > > no
> >>>> >> > > > >> > > changes required.
> >>>> >> > > > >> > >
> >>>> >> > > > >> > > Cheers,
> >>>> >> > > > >> > >
> >>>> >> > > > >> > > Chris
> >>>> >> > > > >> >
> >>>> >> > > > >> >
> >>>> >> > > > >> >
> >>>> >> > > > >> > --
> >>>> >> > > > >> > Gwen Shapira
> >>>> >> > > > >> > Engineering Manager | Confluent
> >>>> >> > > > >> > 650.450.2760 | @gwenshap
> >>>> >> > > > >> > Follow us: Twitter | blog
> >>>> >> > > > >> >
> >>>> >> > > > >>
> >>>> >> > > > >>
> >>>> >> > > > >> --
> >>>> >> > > > >> -- Guozhang
> >>>> >> > > > >>
> >>>> >> > > > >
> >>>> >> > > >
> >>>> >> > >
> >>>> >> > >
> >>>> >> > > --
> >>>> >> > > -- Guozhang
> >>>> >> > >
> >>>> >> >
> >>>> >>
> >>>> >>
> >>>> >> --
> >>>> >> -- Guozhang
> >>>> >>
> >>>> >
> >>>>
> >>>
>

Reply via email to