Hey, Chris.

Thanks again for all your hard work figuring out this complicated problem.
While there are still issues to iron out, at the moment it seems pretty
sensible.

Here's my follow up email with additional comments/questions. I've
organized these by section to help provide context, and I've numbered the
questions to make it easier to reply/discuss. I apologize for the length of
this email -- I thought it better to do this all at once rather than string
out in several emails.

Motivation
1. This section does not mention another important source of duplicate
records: producer retries. Using the idempotent producer can prevent
duplicates due to retries, but it can't help with anything else. Perhaps
it's worth mentioning?

Public Interfaces
2. Have we considered `exactly.once.source.enabled=true` by default in the
worker config? We're currently on track for considering this KIP for
inclusion in AK 3.0, so we are allowed backward incompatible changes.
3. It is possible (but maybe unlikely) that existing source connectors
might already have a connector config property that matches the proposed
new connector config `offsets.storage.topic`. It's good that this matches
the worker's config, but were any other property names considered?
4. I see that one of the rejected alternatives was a connector-level option
to disable EOS. For example, what happens if some source connectors don't
work well with EOS (e.g., the transaction blocking behavior of a multi-task
source connector is unacceptable)? What would we say if a user wants to use
EOS for some source connectors but not others? Is it really viable to not
allow that?
5. The worker source task will create a consumer for the offset topic used
by the connector. Currently, Connect does not use or do anything with
`consumer.override.*` properties, but won't they now likely be needed with
a source connector using EOS? What happens with existing source connector
configurations that don't define these?

Proposed Changes
Offset reads
6. The KIP says "a message will be logged notifying them of this fact."
Should it be more specific, namely "a warning message..."?

SourceTask record commit API
7. The second paragraph talks about why we cannot guarantee the methods be
invoked after every successful commit, and I think that makes sense because
there is no other practical way to handle worker failures. It does seem
like it would be more useful to describe the expectation about calling
these methods during normal task shutdown. Specifically, should connector
developers expect that these methods will be called following a successful
commit immediately prior to stopping the task (barring worker failures)?

Per-connector offsets topic
This section says:

> Finally, it allows users to limit the effect that hanging transactions on
> an offsets topic will have. If tasks A and B use the same offsets topic,
> and task A initiates a transaction on that offsets topic right before task
> B starts up, then task A dies suddenly without committing its transaction,
> task B will have to wait for that transaction to time out before it can
> read to the end of the offsets topic. If the transaction timeout is set
> very high for task A (to accommodate bursts of high throughput, for
> example), this will block task B from processing any data for a long time.
> Although this scenario may be unavoidable in some cases, using a dedicated
> offsets topic for each connector should allow cluster administrators to
> isolate the blast radius of a hanging transaction on an offsets topic. This
> way, although tasks of the same connector may still interfere with each
> other, they will at least not interfere with tasks of other connectors.
> This should be sufficient for most multitenant environments.


8. Won't there also be transaction contention just within a single
connector using multiple tasks? Even if that connector has all offsets go
to a dedicated topic (separate from all other connectors), then isn't it
still possible/likely that a transaction from task 1 blocks a transaction
from task 2? How will this affect latency perceived by consumers (e.g., the
records from task 2's tranasction don't become visible until the
transactions from task 1 and task 2 commit)?

Migration
9. This section says:

> the worker will read to the end of the offsets topic before starting the
> task. If there are no offsets for the connector present in the topic and
> there is no sentinel value...

This sounds like the worker will know that a sentinel value is expected,
but IIUC a worker cannot know that. Instead, the worker consuming the topic
must read to the end and expect _either_ a sentinel value _or_ committed
offsets. Can the text make this more clear?

Task count records
10. This section says "then workers will not bring up tasks for the
connector." What does this mean? Does it mean the tasks fail? How do users
recover from this? I think the "Addressed failure/degradation scenarios"
section tries to address some of these, but any link is implicit.

Source task startup
11. This section says "Otherwise, the worker will emit a warning log
message and refuse to bring the task up until a rebalance occurs." What
exactly does "refuse to bring the task up" mean? Also, "until a rebalance
occurs" suggests that another rebalance will be forthcoming? What would
cause that subsequent rebalance? Isn't it possible that there is no
subsequent rebalance for a potentially significant amount of time? Again,
any link between this section and the "Addressed failure/degradation
scenarios" section is implicit.

12. Also, "Once a worker has instantiated a producer for a source task, it
will read to the end of the config topic once more, and if a new set of
task configurations for that connector has been generated, it will abort
startup of the task." What exactly does "abort startup of the task" mean?
How do users recover from this? Again, I more explicit behavior would be
helpful.

Addressed failure/degradation scenarios
Cannot fence out producer during rebalance
13. This section says:

> The leader may be unable to fence out producers during a rebalance if, for
> example, it is unable to reach or locate a transaction coordinator.

Isn't it possible given N producer transactional IDs for there to be up to
N transaction coordinators? Does this reduce the likelihood that it can be
completed successfully? What risk does this play in rebalances being
unsuccessful?

Rolling upgrade
14. This section says:

> Users can stop each worker, upgrade to a later version if necessary, set
> exactly.once.source.enabled to true in the config, then restart, one by one.

Nit: This is not as clear/precise as it probably needs to be. A subtle
change makes this more precise:

> Users can then do the following for each worker, one by one: stop the
> worker, upgrade to a later version if necessary, set
> exactly.once.source.enabled to true in the config, then restart.


15. A bit later in the same section:

> This will have the effect that, until the leader is upgraded, no upgraded
> workers will start source tasks.

IIUC, the net effect is that rolling upgrades don't really work for source
connectors. Wouldn't it just be easier and maybe faster to stop and restart
the whole cluster? Also, what are the states of the source tasks during
this time?

16. And a bit later:

> Some of these offsets may be out-of-date or not match the ones in the
> connector’s separate offsets topic

The "not match" is probably looser than necessary. What about "older than"?


Future Work
Standalone mode support
17. The fact that EOS only works for distributed Connect was mentioned only
once earlier in the document, but I missed it on my first pass and was a
bit surprised when I read this. Perhaps we should be clearer on this
earlier on in the document, perhaps by adding a "Limitations" section.
There are probably several limitations:

   - Only Connect's distributed worker supports EOS.
   - EOS can be enabled only for all source connectors, and not only for
   some.
   - EOS works only for source connectors that assign source partitions to
   at most one task at a time.

I suspect this last limitation is likely to not be that severe, since such
a connector would have non-deterministic restart behavior with the current
Connect runtime, as each task would commit different source offsets using
the same source partitions. Of course, a following up question
then becomes: What is the impact of any source connector that doesn't
adhere to this last limitation? (This is kind of addressed in the KIP, but
may need to be updated/moved if we add the Limitations section.

Best regards,

Randall


On Thu, Mar 18, 2021 at 5:26 PM Randall Hauch <rha...@apache.org> wrote:

> Chris,
>
> Thanks for the well-written KIP and the more recent updates. It's exciting
> to envision EOS for Connect source connectors!
>
> I'll follow up this email with another that goes into specific comments
> and questions not already touched on by others. This email addresses
> comments and questions previously discussed on this thread.
>
> I'd like to +1 your response to Gwen's question about connector-specific
> offsets topic (#2 in her email). But importantly, a connector can override
> any producer settings, including Kafka credentials that don't have
> permissions on the internal Connect offsets topic. For this reason, I think
> it's necessary for this KIP to address those scenarios.
>
> Gwen's #4 question was about how transactions related to the batches
> source connector tasks return from their `poll()` method, and you followed
> up by clarifying the document to make this more clear. IIUC, the KIP
> suggests still using the existing `offset.flush.interval.ms` property and
> then committing transactions based (solely?) upon the value of the
> property. This would seem to make the transactions based upon time rather
> than record count. Is that really the intent? Did you consider using one
> transaction per "batch" of records, whatever number of records that means?
> One issue with the time-based boundary is that `offset.flush.interval.ms`
> defaults to 60 seconds, and that could mean many hundreds of thousands of
> records on a high throughput task. (BTW, I do agree with the KIP's
> rejected alternative to not expose to the task or let the task define the
> transaction boundaries.)
>
> Gwen's #5 question was about rebalances of a source connector. Your answer
> made sense, but while the KIP talks about source tasks being stopped before
> the connector is rebalanced, I'm wondering about the delay or other impact
> this has on the rebalance itself. The "Cannot fence out producers during
> rebalance" section talks about fencing out producers, but IIUC this is not
> the same as:
>
> ... all workers will preemptively stop all tasks of all source connectors
> for which task configurations.
>
>
> If this is already addressed in the KIP, please just point me to that
> section.
>
> Finally, regarding Guozhang's question about "partial rebalances" rather
> than fencing out all producers. In your initial response, you said the
> following:
>
> Connectors can arbitrarily reassign source partitions (such as
> database tables, or Kafka topic partitions) across tasks, so even if the
> assignment of tasks across workers remains unchanged, the assignment of
> source partitions across those workers might.
>
>
> This is a very important point. In fact, there are definitely connectors
> that shuffle around "source partition" assignments to different tasks, and
> IMO we have to take that into consideration because Connect provides no
> guidance on this and because there are good reasons to do this. So I'm glad
> the KIP takes this into account. This is actually one of the issues I ran
> into during an earlier attempt to POC EOS for source connectors, and the
> ability to force fencing is a novel (albeit heavy-handed) way to deal with
> that.
>
> As I said, I'll follow up in subsequent emails on other specific questions
> I have.
>
> Best regards,
>
> Randall
>
> On Mon, Feb 22, 2021 at 6:45 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
>> This is a great explanation, thank you!
>>
>> On Mon, Feb 22, 2021 at 2:44 PM Chris Egerton <chr...@confluent.io>
>> wrote:
>>
>> > Hi Guozhang,
>> >
>> > Your understanding should be correct in most cases, but there are two
>> finer
>> > points that may interest you:
>> >
>> > 1. It's technically dependent on the implementation of the connector and
>> > how it chooses to allocate source partitions across its tasks; even if
>> the
>> > number of tasks and source partitions remains completely unchanged, a
>> > connector may still choose to shuffle around the partition->task
>> > allocation. I can't think of any cases where this might happen off the
>> top
>> > of my head, but it seemed worth sharing given the educational nature of
>> the
>> > question.
>> > 2. It's also possible that the number of source partitions remains
>> > unchanged, but the set of source partitions changes. One case where this
>> > might happen is with a database connector monitoring for tables that
>> match
>> > a given regex every five minutes; if a table that matched that regex
>> during
>> > the last scan got assigned to a task and then dropped, and then another
>> > table that matched the regex got added before the next scan, the
>> connector
>> > would see the same number of tables, but the actual set would be
>> different.
>> > At this point, it would again be connector-dependent for whether the
>> > already-assigned tables stayed assigned to the same tasks. Is anyone
>> else
>> > reminded of the various consumer partition assignment strategies at this
>> > point?
>> >
>> > A general comment I should make here (not necessarily for your benefit
>> but
>> > for anyone following along) is that it's important to keep in mind that
>> > "source partitions" in Kafka Connect aren't Kafka topic partitions
>> (well,
>> > unless your connector is designed to replicate data across Kafka
>> clusters
>> > like MirrorMaker 2). As a result, we have to rely on developer-written
>> code
>> > to a greater extent to define what the source partitions for a source
>> > connector are, and how to divvy up that work amongst tasks.
>> >
>> > Hope this helps! If you have any further questions please hit me with
>> them;
>> > I doubt you'll be the only one wondering about these things.
>> >
>> > Cheers,
>> >
>> > Chris
>> >
>> > On Mon, Feb 22, 2021 at 5:30 PM Guozhang Wang <wangg...@gmail.com>
>> wrote:
>> >
>> > > Thanks Chris, yeah I think I agree with you that this does not
>> > necessarily
>> > > have to be in the scope of this KIP.
>> > >
>> > > My understanding was that the source partitions -> tasks are not
>> static
>> > but
>> > > dynamic, but they are only changed when either the number of
>> partitions
>> > > changed or "tasks.max" config changed (please correct me if I'm
>> wrong),
>> > so
>> > > what I'm thinking that we can try to detect if either of these things
>> > > happens, and if they do not happen we can assume the mapping from
>> > > partitions -> tasks does not change --- of course this requires some
>> > > extension on the API, aligned with what you said. I would like to make
>> > sure
>> > > that my understanding here is correct :)
>> > >
>> > > Guozhang
>> > >
>> > >
>> > > On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton <chr...@confluent.io>
>> > > wrote:
>> > >
>> > > > Hi Guozhang,
>> > > >
>> > > > Thanks for taking a look, and for your suggestion!
>> > > >
>> > > > I think there is room for more intelligent fencing strategies, but I
>> > > think
>> > > > that it'd have to be more nuanced than one based on task->worker
>> > > > assignments. Connectors can arbitrarily reassign source partitions
>> > (such
>> > > as
>> > > > database tables, or Kafka topic partitions) across tasks, so even if
>> > the
>> > > > assignment of tasks across workers remains unchanged, the
>> assignment of
>> > > > source partitions across those workers might. Connect doesn't do any
>> > > > inspection of task configurations at the moment, and without
>> expansions
>> > > to
>> > > > the Connector/Task API, it'd likely be impossible to get information
>> > from
>> > > > tasks about their source partition assignments. With that in mind, I
>> > > think
>> > > > we may want to leave the door open for more intelligent task fencing
>> > but
>> > > > not include that level of optimization at this stage. Does that
>> sound
>> > > fair
>> > > > to you?
>> > > >
>> > > > There is one case that I've identified where we can cheaply optimize
>> > > right
>> > > > now: single-task connectors, such as the Debezium CDC source
>> > connectors.
>> > > If
>> > > > a connector is configured at some point with a single task, then
>> some
>> > > other
>> > > > part of its configuration is altered but the single-task aspect
>> > remains,
>> > > > the leader doesn't have to worry about fencing out the older task as
>> > the
>> > > > new task's producer will do that automatically. In this case, the
>> > leader
>> > > > can skip the producer fencing round and just write the new task
>> count
>> > > > record straight to the config topic. I've added this case to the
>> KIP;
>> > if
>> > > it
>> > > > overcomplicates things I'm happy to remove it, but seeing as it
>> serves
>> > a
>> > > > real use case and comes fairly cheap, I figured it'd be best to
>> include
>> > > > now.
>> > > >
>> > > > Thanks again for your feedback; if you have other thoughts I'd love
>> to
>> > > hear
>> > > > them!
>> > > >
>> > > > Cheers,
>> > > >
>> > > > Chris
>> > > >
>> > > > On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton <chr...@confluent.io>
>> > > wrote:
>> > > >
>> > > > > Hi Gwen,
>> > > > >
>> > > > > Thanks for the feedback!
>> > > > >
>> > > > > 0.
>> > > > > That's a great point; I've updated the motivation section with
>> that
>> > > > > rationale.
>> > > > >
>> > > > > 1.
>> > > > > This enables safe "hard downgrades" of clusters where, instead of
>> > just
>> > > > > disabling exactly-once support on each worker, each worker is
>> rolled
>> > > back
>> > > > > to an earlier version of the Connect framework that doesn't
>> support
>> > > > > per-connector offsets topics altogether. Those workers would go
>> back
>> > to
>> > > > all
>> > > > > using a global offsets topic, and any offsets stored in
>> per-connector
>> > > > > topics would be lost to those workers. This would cause a large
>> > number
>> > > of
>> > > > > duplicates to flood the downstream system. While technically
>> > > permissible
>> > > > > given that the user in this case will have knowingly switched to a
>> > > > version
>> > > > > of the Connect framework that doesn't support exactly-once source
>> > > > > connectors (and is therefore susceptible to duplicate delivery of
>> > > > records),
>> > > > > the user experience in this case could be pretty bad. A similar
>> > > situation
>> > > > > is if users switch back from per-connector offsets topics to the
>> > global
>> > > > > offsets topic.
>> > > > > I've tried to make this more clear in the KIP by linking to the
>> "Hard
>> > > > > downgrade" section from the proposed design, and by expanding on
>> the
>> > > > > rationale provided for redundant global offset writes in the "Hard
>> > > > > downgrade" section. Let me know if you think this could be
>> improved
>> > or
>> > > > > think a different approach is warranted.
>> > > > >
>> > > > > 2.
>> > > > > I think the biggest difference between Connect and Streams comes
>> from
>> > > the
>> > > > > fact that Connect allows users to create connectors that target
>> > > different
>> > > > > Kafka clusters on the same worker. This hasn't been a problem in
>> the
>> > > past
>> > > > > because workers use two separate producers to write offset data
>> and
>> > > > source
>> > > > > connector records, but in order to write offsets and records in
>> the
>> > > same
>> > > > > transaction, it becomes necessary to use a single producer, which
>> > also
>> > > > > requires that the internal offsets topic be hosted on the same
>> Kafka
>> > > > > cluster that the connector is targeting.
>> > > > > This may sound like a niche use case but it was actually one of
>> the
>> > > > > driving factors behind KIP-458 (
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy
>> > > > ),
>> > > > > and it's a feature that we rely on heavily today.
>> > > > > If there's too much going on in this KIP and we'd prefer to drop
>> > > support
>> > > > > for running that type of setup with exactly-once source connectors
>> > for
>> > > > now,
>> > > > > I can propose this in a separate KIP. I figured it'd be best to
>> get
>> > > this
>> > > > > out of the way with the initial introduction of exactly-once
>> source
>> > > > support
>> > > > > in order to make adoption by existing Connect users as seamless as
>> > > > > possible, and since we'd likely have to address this issue before
>> > being
>> > > > > able to utilize the feature ourselves.
>> > > > > I switched around the ordering of the "motivation" section for
>> > > > > per-connector offsets topics to put the biggest factor first, and
>> > > called
>> > > > it
>> > > > > out as the major difference between Connect and Streams in this
>> case.
>> > > > >
>> > > > > 3.
>> > > > > Fair enough, after giving it a little more thought I agree that
>> > > allowing
>> > > > > users to shoot themselves in the foot is a bad idea here. There's
>> > also
>> > > > some
>> > > > > precedent for handling this with the "enable.idempotence" and "
>> > > > > transactional.id" producer properties; if you specify a
>> > transactional
>> > > ID
>> > > > > but don't specify a value for idempotence, the producer just does
>> the
>> > > > right
>> > > > > thing for you by enabling idempotence and emitting a log message
>> > > letting
>> > > > > you know that it's done so. I've adjusted the proposed behavior to
>> > try
>> > > to
>> > > > > use a similar approach; let me know what you think.
>> > > > > There is the potential gap here where, sometime in the future, a
>> > third
>> > > > > accepted value for the "isolation.level" property is added to the
>> > > > consumer
>> > > > > API and users will be unable to use that new value for their
>> worker.
>> > > But
>> > > > > the likelihood of footgunning seems much greater than this
>> scenario,
>> > > and
>> > > > we
>> > > > > can always address expansions to the consumer API with changes to
>> the
>> > > > > Connect framework as well if/when that becomes necessary.
>> > > > > I've also added a similar note to the source task's transactional
>> ID
>> > > > > property; user overrides of it will also be disabled.
>> > > > >
>> > > > > 4.
>> > > > > Yeah, that's mostly correct. I tried to touch on this in the
>> > > "Motivation"
>> > > > > section with this bit:
>> > > > >     > The Connect framework periodically writes source task
>> offsets
>> > to
>> > > an
>> > > > > internal Kafka topic at a configurable interval, once the source
>> > > records
>> > > > > that they correspond to have been successfully sent to Kafka.
>> > > > > I've expanded on this in the "Offset (and record) writes" section,
>> > and
>> > > > > I've tweaked the "Motivation" section a little bit to add a link
>> to
>> > the
>> > > > > relevant config property and to make the language a little more
>> > > accurate.
>> > > > >
>> > > > > 5.
>> > > > > This isn't quite as bad as stop-the-world; more like
>> > > stop-the-connector.
>> > > > > If a worker is running a dozen connectors and one of those (that
>> > > happens
>> > > > to
>> > > > > be a source) is reconfigured, only the tasks for that connector
>> will
>> > be
>> > > > > preemptively halted, and all other tasks and connectors will
>> continue
>> > > > > running. This is pretty close to the current behavior with
>> > incremental
>> > > > > rebalancing; the only difference is that, instead of waiting for
>> the
>> > > > > rebalance to complete before halting the tasks for that connector,
>> > the
>> > > > > worker will halt them in preparation for the rebalance. This
>> > increases
>> > > > the
>> > > > > time that the tasks will be down for, but is necessary if we want
>> to
>> > > give
>> > > > > tasks time to gracefully shut down before being fenced out by the
>> > > leader,
>> > > > > and shouldn't impact availability too much as it's only triggered
>> on
>> > > > > connector reconfiguration and rebalances generally don't take that
>> > long
>> > > > > with the new incremental protocol.
>> > > > >
>> > > > > 6.
>> > > > > Ah, thanks for the catch! That's a good point. I've adjusted the
>> > > proposal
>> > > > > to treat the worker's transactional ID like the consumer's
>> isolation
>> > > > level
>> > > > > and the source task's transactional ID properties; can't be
>> modified
>> > by
>> > > > > users and, if an attempt is made to do so, will be discarded after
>> > > > logging
>> > > > > a message to that effect. More footguns removed, hooray!
>> > > > >
>> > > > > Thanks again for taking a look; if you have any other questions or
>> > > > > suggestions I'm all ears.
>> > > > >
>> > > > > Cheers,
>> > > > >
>> > > > > Chris
>> > > > >
>> > > > > p.s. - Guozhang--I saw your email; response to you coming next :)
>> > > > >
>> > > > > On Sun, Feb 21, 2021 at 4:12 PM Guozhang Wang <wangg...@gmail.com
>> >
>> > > > wrote:
>> > > > >
>> > > > >> Hello Chris,
>> > > > >>
>> > > > >> Thanks for the great write-up! I was mainly reviewing the admin
>> > > > >> fenceProducers API of the KIP. I think it makes sense overall.
>> I'm
>> > > just
>> > > > >> wondering if we can go one step further, that instead of forcing
>> to
>> > > > fence
>> > > > >> out all producers of the previous generation, could we try to
>> > achieve
>> > > > >> "partial rebalance" still by first generate the new assignment,
>> and
>> > > then
>> > > > >> based on the new assignment only fence out producers involved in
>> > tasks
>> > > > >> that
>> > > > >> are indeed migrated? Just a wild thought to bring up for debate.
>> > > > >>
>> > > > >> Guozhang
>> > > > >>
>> > > > >>
>> > > > >>
>> > > > >> On Sat, Feb 20, 2021 at 10:20 PM Gwen Shapira <g...@confluent.io
>> >
>> > > > wrote:
>> > > > >>
>> > > > >> > Hey Chris,
>> > > > >> >
>> > > > >> > Thank you for the proposal. Few questions / comments:
>> > > > >> >
>> > > > >> > 0. It may be worth pointing out in the motivation section that
>> > > > >> > source-connector exactly once is more important than sink
>> > connector
>> > > > >> > exactly once, since many target systems will have unique key
>> > > > >> > constraint mechanisms that will prevent duplicates. Kafka does
>> not
>> > > > >> > have any such constraints, so without this KIP-618, exactly
>> once
>> > > won't
>> > > > >> > be possible.
>> > > > >> > 1. I am not clear why we need the worker to async copy offsets
>> > from
>> > > > >> > the connector-specific offset topic to a global offsets topic
>> > > > >> > 2. While the reasoning you have for offset topic per connector
>> > > appears
>> > > > >> > sound, it doesn't add up with the use of transactions in
>> > > KafkaStreams.
>> > > > >> > My understanding is that KafkaStreams uses shared offsets topic
>> > with
>> > > > >> > all the other consumers, and (apparently) corrupting data and
>> > delays
>> > > > >> > by other tenants is a non-issue. Perhaps you can comment on how
>> > > > >> > Connect is different? In general much of the complexity in the
>> KIP
>> > > is
>> > > > >> > related to the separate offset topic, and I'm wondering if this
>> > can
>> > > be
>> > > > >> > avoided. The migration use-case is interesting, but not
>> related to
>> > > > >> > exactly-once and can be handled separately.
>> > > > >> > 3. Allowing users to override the isolation level for the
>> offset
>> > > > >> > reader, even when exactly-once is enabled, thereby disabling
>> > > > >> > exactly-once in a non-obvious way. I get that connect usually
>> > allows
>> > > > >> > users to shoot themselves in the foot, but are there any actual
>> > > > >> > benefits for allowing it in this case? Maybe it is better if we
>> > > don't?
>> > > > >> > I don't find the argument that we always did this to be
>> > particularly
>> > > > >> > compelling.
>> > > > >> > 4. It isn't stated explicitly, but it sounds like connect or
>> > source
>> > > > >> > connectors already have some batching mechanism, and that
>> > > transaction
>> > > > >> > boundaries will match the batches (i.e. each batch will be a
>> > > > >> > transaction?). If so, worth being explicit.
>> > > > >> > 5. "When a rebalance is triggered, before (re-)joining the
>> cluster
>> > > > >> > group, all workers will preemptively stop all tasks of all
>> source
>> > > > >> > connectors for which task configurations are present in the
>> config
>> > > > >> > topic after the latest task count record" - how will this play
>> > with
>> > > > >> > the incremental rebalances? isn't this exactly the
>> stop-the-world
>> > > > >> > rebalance we want to avoid?
>> > > > >> > 6. "the worker will instantiate a transactional producer whose
>> > > > >> > transactional ID is, by default, the group ID of the cluster
>> (but
>> > > may
>> > > > >> > be overwritten by users using the transactional.id worker
>> > > property)"
>> > > > -
>> > > > >> > If users change transactional.id property, zombie leaders
>> won't
>> > get
>> > > > >> > fenced (since they will have an older and different
>> transactional
>> > > id)
>> > > > >> >
>> > > > >> > Thanks,
>> > > > >> >
>> > > > >> > Gwen
>> > > > >> >
>> > > > >> > On Thu, May 21, 2020 at 11:21 PM Chris Egerton <
>> > chr...@confluent.io
>> > > >
>> > > > >> > wrote:
>> > > > >> > >
>> > > > >> > > Hi all,
>> > > > >> > >
>> > > > >> > > I know it's a busy time with the upcoming 2.6 release and I
>> > don't
>> > > > >> expect
>> > > > >> > > this to get a lot of traction until that's done, but I've
>> > > published
>> > > > a
>> > > > >> KIP
>> > > > >> > > for allowing atomic commit of offsets and records for source
>> > > > >> connectors
>> > > > >> > and
>> > > > >> > > would appreciate your feedback:
>> > > > >> > >
>> > > > >> >
>> > > > >>
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
>> > > > >> > >
>> > > > >> > > This feature should make it possible to implement source
>> > > connectors
>> > > > >> with
>> > > > >> > > exactly-once delivery guarantees, and even allow a wide
>> range of
>> > > > >> existing
>> > > > >> > > source connectors to provide exactly-once delivery guarantees
>> > with
>> > > > no
>> > > > >> > > changes required.
>> > > > >> > >
>> > > > >> > > Cheers,
>> > > > >> > >
>> > > > >> > > Chris
>> > > > >> >
>> > > > >> >
>> > > > >> >
>> > > > >> > --
>> > > > >> > Gwen Shapira
>> > > > >> > Engineering Manager | Confluent
>> > > > >> > 650.450.2760 | @gwenshap
>> > > > >> > Follow us: Twitter | blog
>> > > > >> >
>> > > > >>
>> > > > >>
>> > > > >> --
>> > > > >> -- Guozhang
>> > > > >>
>> > > > >
>> > > >
>> > >
>> > >
>> > > --
>> > > -- Guozhang
>> > >
>> >
>>
>>
>> --
>> -- Guozhang
>>
>

Reply via email to