Chris,

Thanks for the well-written KIP and the more recent updates. It's exciting
to envision EOS for Connect source connectors!

I'll follow up this email with another that goes into specific comments and
questions not already touched on by others. This email addresses comments
and questions previously discussed on this thread.

I'd like to +1 your response to Gwen's question about connector-specific
offsets topic (#2 in her email). But importantly, a connector can override
any producer settings, including Kafka credentials that don't have
permissions on the internal Connect offsets topic. For this reason, I think
it's necessary for this KIP to address those scenarios.

Gwen's #4 question was about how transactions related to the batches source
connector tasks return from their `poll()` method, and you followed up by
clarifying the document to make this more clear. IIUC, the KIP suggests
still using the existing `offset.flush.interval.ms` property and then
committing transactions based (solely?) upon the value of the property.
This would seem to make the transactions based upon time rather than record
count. Is that really the intent? Did you consider using one transaction
per "batch" of records, whatever number of records that means? One issue
with the time-based boundary is that `offset.flush.interval.ms` defaults to
60 seconds, and that could mean many hundreds of thousands of records on a
high throughput task. (BTW, I do agree with the KIP's rejected alternative
to not expose to the task or let the task define the transaction
boundaries.)

Gwen's #5 question was about rebalances of a source connector. Your answer
made sense, but while the KIP talks about source tasks being stopped before
the connector is rebalanced, I'm wondering about the delay or other impact
this has on the rebalance itself. The "Cannot fence out producers during
rebalance" section talks about fencing out producers, but IIUC this is not
the same as:

... all workers will preemptively stop all tasks of all source connectors
for which task configurations.


If this is already addressed in the KIP, please just point me to that
section.

Finally, regarding Guozhang's question about "partial rebalances" rather
than fencing out all producers. In your initial response, you said the
following:

Connectors can arbitrarily reassign source partitions (such as
database tables, or Kafka topic partitions) across tasks, so even if the
assignment of tasks across workers remains unchanged, the assignment of
source partitions across those workers might.


This is a very important point. In fact, there are definitely connectors
that shuffle around "source partition" assignments to different tasks, and
IMO we have to take that into consideration because Connect provides no
guidance on this and because there are good reasons to do this. So I'm glad
the KIP takes this into account. This is actually one of the issues I ran
into during an earlier attempt to POC EOS for source connectors, and the
ability to force fencing is a novel (albeit heavy-handed) way to deal with
that.

As I said, I'll follow up in subsequent emails on other specific questions
I have.

Best regards,

Randall

On Mon, Feb 22, 2021 at 6:45 PM Guozhang Wang <wangg...@gmail.com> wrote:

> This is a great explanation, thank you!
>
> On Mon, Feb 22, 2021 at 2:44 PM Chris Egerton <chr...@confluent.io> wrote:
>
> > Hi Guozhang,
> >
> > Your understanding should be correct in most cases, but there are two
> finer
> > points that may interest you:
> >
> > 1. It's technically dependent on the implementation of the connector and
> > how it chooses to allocate source partitions across its tasks; even if
> the
> > number of tasks and source partitions remains completely unchanged, a
> > connector may still choose to shuffle around the partition->task
> > allocation. I can't think of any cases where this might happen off the
> top
> > of my head, but it seemed worth sharing given the educational nature of
> the
> > question.
> > 2. It's also possible that the number of source partitions remains
> > unchanged, but the set of source partitions changes. One case where this
> > might happen is with a database connector monitoring for tables that
> match
> > a given regex every five minutes; if a table that matched that regex
> during
> > the last scan got assigned to a task and then dropped, and then another
> > table that matched the regex got added before the next scan, the
> connector
> > would see the same number of tables, but the actual set would be
> different.
> > At this point, it would again be connector-dependent for whether the
> > already-assigned tables stayed assigned to the same tasks. Is anyone else
> > reminded of the various consumer partition assignment strategies at this
> > point?
> >
> > A general comment I should make here (not necessarily for your benefit
> but
> > for anyone following along) is that it's important to keep in mind that
> > "source partitions" in Kafka Connect aren't Kafka topic partitions (well,
> > unless your connector is designed to replicate data across Kafka clusters
> > like MirrorMaker 2). As a result, we have to rely on developer-written
> code
> > to a greater extent to define what the source partitions for a source
> > connector are, and how to divvy up that work amongst tasks.
> >
> > Hope this helps! If you have any further questions please hit me with
> them;
> > I doubt you'll be the only one wondering about these things.
> >
> > Cheers,
> >
> > Chris
> >
> > On Mon, Feb 22, 2021 at 5:30 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >
> > > Thanks Chris, yeah I think I agree with you that this does not
> > necessarily
> > > have to be in the scope of this KIP.
> > >
> > > My understanding was that the source partitions -> tasks are not static
> > but
> > > dynamic, but they are only changed when either the number of partitions
> > > changed or "tasks.max" config changed (please correct me if I'm wrong),
> > so
> > > what I'm thinking that we can try to detect if either of these things
> > > happens, and if they do not happen we can assume the mapping from
> > > partitions -> tasks does not change --- of course this requires some
> > > extension on the API, aligned with what you said. I would like to make
> > sure
> > > that my understanding here is correct :)
> > >
> > > Guozhang
> > >
> > >
> > > On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton <chr...@confluent.io>
> > > wrote:
> > >
> > > > Hi Guozhang,
> > > >
> > > > Thanks for taking a look, and for your suggestion!
> > > >
> > > > I think there is room for more intelligent fencing strategies, but I
> > > think
> > > > that it'd have to be more nuanced than one based on task->worker
> > > > assignments. Connectors can arbitrarily reassign source partitions
> > (such
> > > as
> > > > database tables, or Kafka topic partitions) across tasks, so even if
> > the
> > > > assignment of tasks across workers remains unchanged, the assignment
> of
> > > > source partitions across those workers might. Connect doesn't do any
> > > > inspection of task configurations at the moment, and without
> expansions
> > > to
> > > > the Connector/Task API, it'd likely be impossible to get information
> > from
> > > > tasks about their source partition assignments. With that in mind, I
> > > think
> > > > we may want to leave the door open for more intelligent task fencing
> > but
> > > > not include that level of optimization at this stage. Does that sound
> > > fair
> > > > to you?
> > > >
> > > > There is one case that I've identified where we can cheaply optimize
> > > right
> > > > now: single-task connectors, such as the Debezium CDC source
> > connectors.
> > > If
> > > > a connector is configured at some point with a single task, then some
> > > other
> > > > part of its configuration is altered but the single-task aspect
> > remains,
> > > > the leader doesn't have to worry about fencing out the older task as
> > the
> > > > new task's producer will do that automatically. In this case, the
> > leader
> > > > can skip the producer fencing round and just write the new task count
> > > > record straight to the config topic. I've added this case to the KIP;
> > if
> > > it
> > > > overcomplicates things I'm happy to remove it, but seeing as it
> serves
> > a
> > > > real use case and comes fairly cheap, I figured it'd be best to
> include
> > > > now.
> > > >
> > > > Thanks again for your feedback; if you have other thoughts I'd love
> to
> > > hear
> > > > them!
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton <chr...@confluent.io>
> > > wrote:
> > > >
> > > > > Hi Gwen,
> > > > >
> > > > > Thanks for the feedback!
> > > > >
> > > > > 0.
> > > > > That's a great point; I've updated the motivation section with that
> > > > > rationale.
> > > > >
> > > > > 1.
> > > > > This enables safe "hard downgrades" of clusters where, instead of
> > just
> > > > > disabling exactly-once support on each worker, each worker is
> rolled
> > > back
> > > > > to an earlier version of the Connect framework that doesn't support
> > > > > per-connector offsets topics altogether. Those workers would go
> back
> > to
> > > > all
> > > > > using a global offsets topic, and any offsets stored in
> per-connector
> > > > > topics would be lost to those workers. This would cause a large
> > number
> > > of
> > > > > duplicates to flood the downstream system. While technically
> > > permissible
> > > > > given that the user in this case will have knowingly switched to a
> > > > version
> > > > > of the Connect framework that doesn't support exactly-once source
> > > > > connectors (and is therefore susceptible to duplicate delivery of
> > > > records),
> > > > > the user experience in this case could be pretty bad. A similar
> > > situation
> > > > > is if users switch back from per-connector offsets topics to the
> > global
> > > > > offsets topic.
> > > > > I've tried to make this more clear in the KIP by linking to the
> "Hard
> > > > > downgrade" section from the proposed design, and by expanding on
> the
> > > > > rationale provided for redundant global offset writes in the "Hard
> > > > > downgrade" section. Let me know if you think this could be improved
> > or
> > > > > think a different approach is warranted.
> > > > >
> > > > > 2.
> > > > > I think the biggest difference between Connect and Streams comes
> from
> > > the
> > > > > fact that Connect allows users to create connectors that target
> > > different
> > > > > Kafka clusters on the same worker. This hasn't been a problem in
> the
> > > past
> > > > > because workers use two separate producers to write offset data and
> > > > source
> > > > > connector records, but in order to write offsets and records in the
> > > same
> > > > > transaction, it becomes necessary to use a single producer, which
> > also
> > > > > requires that the internal offsets topic be hosted on the same
> Kafka
> > > > > cluster that the connector is targeting.
> > > > > This may sound like a niche use case but it was actually one of the
> > > > > driving factors behind KIP-458 (
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy
> > > > ),
> > > > > and it's a feature that we rely on heavily today.
> > > > > If there's too much going on in this KIP and we'd prefer to drop
> > > support
> > > > > for running that type of setup with exactly-once source connectors
> > for
> > > > now,
> > > > > I can propose this in a separate KIP. I figured it'd be best to get
> > > this
> > > > > out of the way with the initial introduction of exactly-once source
> > > > support
> > > > > in order to make adoption by existing Connect users as seamless as
> > > > > possible, and since we'd likely have to address this issue before
> > being
> > > > > able to utilize the feature ourselves.
> > > > > I switched around the ordering of the "motivation" section for
> > > > > per-connector offsets topics to put the biggest factor first, and
> > > called
> > > > it
> > > > > out as the major difference between Connect and Streams in this
> case.
> > > > >
> > > > > 3.
> > > > > Fair enough, after giving it a little more thought I agree that
> > > allowing
> > > > > users to shoot themselves in the foot is a bad idea here. There's
> > also
> > > > some
> > > > > precedent for handling this with the "enable.idempotence" and "
> > > > > transactional.id" producer properties; if you specify a
> > transactional
> > > ID
> > > > > but don't specify a value for idempotence, the producer just does
> the
> > > > right
> > > > > thing for you by enabling idempotence and emitting a log message
> > > letting
> > > > > you know that it's done so. I've adjusted the proposed behavior to
> > try
> > > to
> > > > > use a similar approach; let me know what you think.
> > > > > There is the potential gap here where, sometime in the future, a
> > third
> > > > > accepted value for the "isolation.level" property is added to the
> > > > consumer
> > > > > API and users will be unable to use that new value for their
> worker.
> > > But
> > > > > the likelihood of footgunning seems much greater than this
> scenario,
> > > and
> > > > we
> > > > > can always address expansions to the consumer API with changes to
> the
> > > > > Connect framework as well if/when that becomes necessary.
> > > > > I've also added a similar note to the source task's transactional
> ID
> > > > > property; user overrides of it will also be disabled.
> > > > >
> > > > > 4.
> > > > > Yeah, that's mostly correct. I tried to touch on this in the
> > > "Motivation"
> > > > > section with this bit:
> > > > >     > The Connect framework periodically writes source task offsets
> > to
> > > an
> > > > > internal Kafka topic at a configurable interval, once the source
> > > records
> > > > > that they correspond to have been successfully sent to Kafka.
> > > > > I've expanded on this in the "Offset (and record) writes" section,
> > and
> > > > > I've tweaked the "Motivation" section a little bit to add a link to
> > the
> > > > > relevant config property and to make the language a little more
> > > accurate.
> > > > >
> > > > > 5.
> > > > > This isn't quite as bad as stop-the-world; more like
> > > stop-the-connector.
> > > > > If a worker is running a dozen connectors and one of those (that
> > > happens
> > > > to
> > > > > be a source) is reconfigured, only the tasks for that connector
> will
> > be
> > > > > preemptively halted, and all other tasks and connectors will
> continue
> > > > > running. This is pretty close to the current behavior with
> > incremental
> > > > > rebalancing; the only difference is that, instead of waiting for
> the
> > > > > rebalance to complete before halting the tasks for that connector,
> > the
> > > > > worker will halt them in preparation for the rebalance. This
> > increases
> > > > the
> > > > > time that the tasks will be down for, but is necessary if we want
> to
> > > give
> > > > > tasks time to gracefully shut down before being fenced out by the
> > > leader,
> > > > > and shouldn't impact availability too much as it's only triggered
> on
> > > > > connector reconfiguration and rebalances generally don't take that
> > long
> > > > > with the new incremental protocol.
> > > > >
> > > > > 6.
> > > > > Ah, thanks for the catch! That's a good point. I've adjusted the
> > > proposal
> > > > > to treat the worker's transactional ID like the consumer's
> isolation
> > > > level
> > > > > and the source task's transactional ID properties; can't be
> modified
> > by
> > > > > users and, if an attempt is made to do so, will be discarded after
> > > > logging
> > > > > a message to that effect. More footguns removed, hooray!
> > > > >
> > > > > Thanks again for taking a look; if you have any other questions or
> > > > > suggestions I'm all ears.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > p.s. - Guozhang--I saw your email; response to you coming next :)
> > > > >
> > > > > On Sun, Feb 21, 2021 at 4:12 PM Guozhang Wang <wangg...@gmail.com>
> > > > wrote:
> > > > >
> > > > >> Hello Chris,
> > > > >>
> > > > >> Thanks for the great write-up! I was mainly reviewing the admin
> > > > >> fenceProducers API of the KIP. I think it makes sense overall. I'm
> > > just
> > > > >> wondering if we can go one step further, that instead of forcing
> to
> > > > fence
> > > > >> out all producers of the previous generation, could we try to
> > achieve
> > > > >> "partial rebalance" still by first generate the new assignment,
> and
> > > then
> > > > >> based on the new assignment only fence out producers involved in
> > tasks
> > > > >> that
> > > > >> are indeed migrated? Just a wild thought to bring up for debate.
> > > > >>
> > > > >> Guozhang
> > > > >>
> > > > >>
> > > > >>
> > > > >> On Sat, Feb 20, 2021 at 10:20 PM Gwen Shapira <g...@confluent.io>
> > > > wrote:
> > > > >>
> > > > >> > Hey Chris,
> > > > >> >
> > > > >> > Thank you for the proposal. Few questions / comments:
> > > > >> >
> > > > >> > 0. It may be worth pointing out in the motivation section that
> > > > >> > source-connector exactly once is more important than sink
> > connector
> > > > >> > exactly once, since many target systems will have unique key
> > > > >> > constraint mechanisms that will prevent duplicates. Kafka does
> not
> > > > >> > have any such constraints, so without this KIP-618, exactly once
> > > won't
> > > > >> > be possible.
> > > > >> > 1. I am not clear why we need the worker to async copy offsets
> > from
> > > > >> > the connector-specific offset topic to a global offsets topic
> > > > >> > 2. While the reasoning you have for offset topic per connector
> > > appears
> > > > >> > sound, it doesn't add up with the use of transactions in
> > > KafkaStreams.
> > > > >> > My understanding is that KafkaStreams uses shared offsets topic
> > with
> > > > >> > all the other consumers, and (apparently) corrupting data and
> > delays
> > > > >> > by other tenants is a non-issue. Perhaps you can comment on how
> > > > >> > Connect is different? In general much of the complexity in the
> KIP
> > > is
> > > > >> > related to the separate offset topic, and I'm wondering if this
> > can
> > > be
> > > > >> > avoided. The migration use-case is interesting, but not related
> to
> > > > >> > exactly-once and can be handled separately.
> > > > >> > 3. Allowing users to override the isolation level for the offset
> > > > >> > reader, even when exactly-once is enabled, thereby disabling
> > > > >> > exactly-once in a non-obvious way. I get that connect usually
> > allows
> > > > >> > users to shoot themselves in the foot, but are there any actual
> > > > >> > benefits for allowing it in this case? Maybe it is better if we
> > > don't?
> > > > >> > I don't find the argument that we always did this to be
> > particularly
> > > > >> > compelling.
> > > > >> > 4. It isn't stated explicitly, but it sounds like connect or
> > source
> > > > >> > connectors already have some batching mechanism, and that
> > > transaction
> > > > >> > boundaries will match the batches (i.e. each batch will be a
> > > > >> > transaction?). If so, worth being explicit.
> > > > >> > 5. "When a rebalance is triggered, before (re-)joining the
> cluster
> > > > >> > group, all workers will preemptively stop all tasks of all
> source
> > > > >> > connectors for which task configurations are present in the
> config
> > > > >> > topic after the latest task count record" - how will this play
> > with
> > > > >> > the incremental rebalances? isn't this exactly the
> stop-the-world
> > > > >> > rebalance we want to avoid?
> > > > >> > 6. "the worker will instantiate a transactional producer whose
> > > > >> > transactional ID is, by default, the group ID of the cluster
> (but
> > > may
> > > > >> > be overwritten by users using the transactional.id worker
> > > property)"
> > > > -
> > > > >> > If users change transactional.id property, zombie leaders won't
> > get
> > > > >> > fenced (since they will have an older and different
> transactional
> > > id)
> > > > >> >
> > > > >> > Thanks,
> > > > >> >
> > > > >> > Gwen
> > > > >> >
> > > > >> > On Thu, May 21, 2020 at 11:21 PM Chris Egerton <
> > chr...@confluent.io
> > > >
> > > > >> > wrote:
> > > > >> > >
> > > > >> > > Hi all,
> > > > >> > >
> > > > >> > > I know it's a busy time with the upcoming 2.6 release and I
> > don't
> > > > >> expect
> > > > >> > > this to get a lot of traction until that's done, but I've
> > > published
> > > > a
> > > > >> KIP
> > > > >> > > for allowing atomic commit of offsets and records for source
> > > > >> connectors
> > > > >> > and
> > > > >> > > would appreciate your feedback:
> > > > >> > >
> > > > >> >
> > > > >>
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> > > > >> > >
> > > > >> > > This feature should make it possible to implement source
> > > connectors
> > > > >> with
> > > > >> > > exactly-once delivery guarantees, and even allow a wide range
> of
> > > > >> existing
> > > > >> > > source connectors to provide exactly-once delivery guarantees
> > with
> > > > no
> > > > >> > > changes required.
> > > > >> > >
> > > > >> > > Cheers,
> > > > >> > >
> > > > >> > > Chris
> > > > >> >
> > > > >> >
> > > > >> >
> > > > >> > --
> > > > >> > Gwen Shapira
> > > > >> > Engineering Manager | Confluent
> > > > >> > 650.450.2760 | @gwenshap
> > > > >> > Follow us: Twitter | blog
> > > > >> >
> > > > >>
> > > > >>
> > > > >> --
> > > > >> -- Guozhang
> > > > >>
> > > > >
> > > >
> > >
> > >
> > > --
> > > -- Guozhang
> > >
> >
>
>
> --
> -- Guozhang
>

Reply via email to