This is a great explanation, thank you!

On Mon, Feb 22, 2021 at 2:44 PM Chris Egerton <chr...@confluent.io> wrote:

> Hi Guozhang,
>
> Your understanding should be correct in most cases, but there are two finer
> points that may interest you:
>
> 1. It's technically dependent on the implementation of the connector and
> how it chooses to allocate source partitions across its tasks; even if the
> number of tasks and source partitions remains completely unchanged, a
> connector may still choose to shuffle around the partition->task
> allocation. I can't think of any cases where this might happen off the top
> of my head, but it seemed worth sharing given the educational nature of the
> question.
> 2. It's also possible that the number of source partitions remains
> unchanged, but the set of source partitions changes. One case where this
> might happen is with a database connector monitoring for tables that match
> a given regex every five minutes; if a table that matched that regex during
> the last scan got assigned to a task and then dropped, and then another
> table that matched the regex got added before the next scan, the connector
> would see the same number of tables, but the actual set would be different.
> At this point, it would again be connector-dependent for whether the
> already-assigned tables stayed assigned to the same tasks. Is anyone else
> reminded of the various consumer partition assignment strategies at this
> point?
>
> A general comment I should make here (not necessarily for your benefit but
> for anyone following along) is that it's important to keep in mind that
> "source partitions" in Kafka Connect aren't Kafka topic partitions (well,
> unless your connector is designed to replicate data across Kafka clusters
> like MirrorMaker 2). As a result, we have to rely on developer-written code
> to a greater extent to define what the source partitions for a source
> connector are, and how to divvy up that work amongst tasks.
>
> Hope this helps! If you have any further questions please hit me with them;
> I doubt you'll be the only one wondering about these things.
>
> Cheers,
>
> Chris
>
> On Mon, Feb 22, 2021 at 5:30 PM Guozhang Wang <wangg...@gmail.com> wrote:
>
> > Thanks Chris, yeah I think I agree with you that this does not
> necessarily
> > have to be in the scope of this KIP.
> >
> > My understanding was that the source partitions -> tasks are not static
> but
> > dynamic, but they are only changed when either the number of partitions
> > changed or "tasks.max" config changed (please correct me if I'm wrong),
> so
> > what I'm thinking that we can try to detect if either of these things
> > happens, and if they do not happen we can assume the mapping from
> > partitions -> tasks does not change --- of course this requires some
> > extension on the API, aligned with what you said. I would like to make
> sure
> > that my understanding here is correct :)
> >
> > Guozhang
> >
> >
> > On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton <chr...@confluent.io>
> > wrote:
> >
> > > Hi Guozhang,
> > >
> > > Thanks for taking a look, and for your suggestion!
> > >
> > > I think there is room for more intelligent fencing strategies, but I
> > think
> > > that it'd have to be more nuanced than one based on task->worker
> > > assignments. Connectors can arbitrarily reassign source partitions
> (such
> > as
> > > database tables, or Kafka topic partitions) across tasks, so even if
> the
> > > assignment of tasks across workers remains unchanged, the assignment of
> > > source partitions across those workers might. Connect doesn't do any
> > > inspection of task configurations at the moment, and without expansions
> > to
> > > the Connector/Task API, it'd likely be impossible to get information
> from
> > > tasks about their source partition assignments. With that in mind, I
> > think
> > > we may want to leave the door open for more intelligent task fencing
> but
> > > not include that level of optimization at this stage. Does that sound
> > fair
> > > to you?
> > >
> > > There is one case that I've identified where we can cheaply optimize
> > right
> > > now: single-task connectors, such as the Debezium CDC source
> connectors.
> > If
> > > a connector is configured at some point with a single task, then some
> > other
> > > part of its configuration is altered but the single-task aspect
> remains,
> > > the leader doesn't have to worry about fencing out the older task as
> the
> > > new task's producer will do that automatically. In this case, the
> leader
> > > can skip the producer fencing round and just write the new task count
> > > record straight to the config topic. I've added this case to the KIP;
> if
> > it
> > > overcomplicates things I'm happy to remove it, but seeing as it serves
> a
> > > real use case and comes fairly cheap, I figured it'd be best to include
> > > now.
> > >
> > > Thanks again for your feedback; if you have other thoughts I'd love to
> > hear
> > > them!
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton <chr...@confluent.io>
> > wrote:
> > >
> > > > Hi Gwen,
> > > >
> > > > Thanks for the feedback!
> > > >
> > > > 0.
> > > > That's a great point; I've updated the motivation section with that
> > > > rationale.
> > > >
> > > > 1.
> > > > This enables safe "hard downgrades" of clusters where, instead of
> just
> > > > disabling exactly-once support on each worker, each worker is rolled
> > back
> > > > to an earlier version of the Connect framework that doesn't support
> > > > per-connector offsets topics altogether. Those workers would go back
> to
> > > all
> > > > using a global offsets topic, and any offsets stored in per-connector
> > > > topics would be lost to those workers. This would cause a large
> number
> > of
> > > > duplicates to flood the downstream system. While technically
> > permissible
> > > > given that the user in this case will have knowingly switched to a
> > > version
> > > > of the Connect framework that doesn't support exactly-once source
> > > > connectors (and is therefore susceptible to duplicate delivery of
> > > records),
> > > > the user experience in this case could be pretty bad. A similar
> > situation
> > > > is if users switch back from per-connector offsets topics to the
> global
> > > > offsets topic.
> > > > I've tried to make this more clear in the KIP by linking to the "Hard
> > > > downgrade" section from the proposed design, and by expanding on the
> > > > rationale provided for redundant global offset writes in the "Hard
> > > > downgrade" section. Let me know if you think this could be improved
> or
> > > > think a different approach is warranted.
> > > >
> > > > 2.
> > > > I think the biggest difference between Connect and Streams comes from
> > the
> > > > fact that Connect allows users to create connectors that target
> > different
> > > > Kafka clusters on the same worker. This hasn't been a problem in the
> > past
> > > > because workers use two separate producers to write offset data and
> > > source
> > > > connector records, but in order to write offsets and records in the
> > same
> > > > transaction, it becomes necessary to use a single producer, which
> also
> > > > requires that the internal offsets topic be hosted on the same Kafka
> > > > cluster that the connector is targeting.
> > > > This may sound like a niche use case but it was actually one of the
> > > > driving factors behind KIP-458 (
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy
> > > ),
> > > > and it's a feature that we rely on heavily today.
> > > > If there's too much going on in this KIP and we'd prefer to drop
> > support
> > > > for running that type of setup with exactly-once source connectors
> for
> > > now,
> > > > I can propose this in a separate KIP. I figured it'd be best to get
> > this
> > > > out of the way with the initial introduction of exactly-once source
> > > support
> > > > in order to make adoption by existing Connect users as seamless as
> > > > possible, and since we'd likely have to address this issue before
> being
> > > > able to utilize the feature ourselves.
> > > > I switched around the ordering of the "motivation" section for
> > > > per-connector offsets topics to put the biggest factor first, and
> > called
> > > it
> > > > out as the major difference between Connect and Streams in this case.
> > > >
> > > > 3.
> > > > Fair enough, after giving it a little more thought I agree that
> > allowing
> > > > users to shoot themselves in the foot is a bad idea here. There's
> also
> > > some
> > > > precedent for handling this with the "enable.idempotence" and "
> > > > transactional.id" producer properties; if you specify a
> transactional
> > ID
> > > > but don't specify a value for idempotence, the producer just does the
> > > right
> > > > thing for you by enabling idempotence and emitting a log message
> > letting
> > > > you know that it's done so. I've adjusted the proposed behavior to
> try
> > to
> > > > use a similar approach; let me know what you think.
> > > > There is the potential gap here where, sometime in the future, a
> third
> > > > accepted value for the "isolation.level" property is added to the
> > > consumer
> > > > API and users will be unable to use that new value for their worker.
> > But
> > > > the likelihood of footgunning seems much greater than this scenario,
> > and
> > > we
> > > > can always address expansions to the consumer API with changes to the
> > > > Connect framework as well if/when that becomes necessary.
> > > > I've also added a similar note to the source task's transactional ID
> > > > property; user overrides of it will also be disabled.
> > > >
> > > > 4.
> > > > Yeah, that's mostly correct. I tried to touch on this in the
> > "Motivation"
> > > > section with this bit:
> > > >     > The Connect framework periodically writes source task offsets
> to
> > an
> > > > internal Kafka topic at a configurable interval, once the source
> > records
> > > > that they correspond to have been successfully sent to Kafka.
> > > > I've expanded on this in the "Offset (and record) writes" section,
> and
> > > > I've tweaked the "Motivation" section a little bit to add a link to
> the
> > > > relevant config property and to make the language a little more
> > accurate.
> > > >
> > > > 5.
> > > > This isn't quite as bad as stop-the-world; more like
> > stop-the-connector.
> > > > If a worker is running a dozen connectors and one of those (that
> > happens
> > > to
> > > > be a source) is reconfigured, only the tasks for that connector will
> be
> > > > preemptively halted, and all other tasks and connectors will continue
> > > > running. This is pretty close to the current behavior with
> incremental
> > > > rebalancing; the only difference is that, instead of waiting for the
> > > > rebalance to complete before halting the tasks for that connector,
> the
> > > > worker will halt them in preparation for the rebalance. This
> increases
> > > the
> > > > time that the tasks will be down for, but is necessary if we want to
> > give
> > > > tasks time to gracefully shut down before being fenced out by the
> > leader,
> > > > and shouldn't impact availability too much as it's only triggered on
> > > > connector reconfiguration and rebalances generally don't take that
> long
> > > > with the new incremental protocol.
> > > >
> > > > 6.
> > > > Ah, thanks for the catch! That's a good point. I've adjusted the
> > proposal
> > > > to treat the worker's transactional ID like the consumer's isolation
> > > level
> > > > and the source task's transactional ID properties; can't be modified
> by
> > > > users and, if an attempt is made to do so, will be discarded after
> > > logging
> > > > a message to that effect. More footguns removed, hooray!
> > > >
> > > > Thanks again for taking a look; if you have any other questions or
> > > > suggestions I'm all ears.
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > > > p.s. - Guozhang--I saw your email; response to you coming next :)
> > > >
> > > > On Sun, Feb 21, 2021 at 4:12 PM Guozhang Wang <wangg...@gmail.com>
> > > wrote:
> > > >
> > > >> Hello Chris,
> > > >>
> > > >> Thanks for the great write-up! I was mainly reviewing the admin
> > > >> fenceProducers API of the KIP. I think it makes sense overall. I'm
> > just
> > > >> wondering if we can go one step further, that instead of forcing to
> > > fence
> > > >> out all producers of the previous generation, could we try to
> achieve
> > > >> "partial rebalance" still by first generate the new assignment, and
> > then
> > > >> based on the new assignment only fence out producers involved in
> tasks
> > > >> that
> > > >> are indeed migrated? Just a wild thought to bring up for debate.
> > > >>
> > > >> Guozhang
> > > >>
> > > >>
> > > >>
> > > >> On Sat, Feb 20, 2021 at 10:20 PM Gwen Shapira <g...@confluent.io>
> > > wrote:
> > > >>
> > > >> > Hey Chris,
> > > >> >
> > > >> > Thank you for the proposal. Few questions / comments:
> > > >> >
> > > >> > 0. It may be worth pointing out in the motivation section that
> > > >> > source-connector exactly once is more important than sink
> connector
> > > >> > exactly once, since many target systems will have unique key
> > > >> > constraint mechanisms that will prevent duplicates. Kafka does not
> > > >> > have any such constraints, so without this KIP-618, exactly once
> > won't
> > > >> > be possible.
> > > >> > 1. I am not clear why we need the worker to async copy offsets
> from
> > > >> > the connector-specific offset topic to a global offsets topic
> > > >> > 2. While the reasoning you have for offset topic per connector
> > appears
> > > >> > sound, it doesn't add up with the use of transactions in
> > KafkaStreams.
> > > >> > My understanding is that KafkaStreams uses shared offsets topic
> with
> > > >> > all the other consumers, and (apparently) corrupting data and
> delays
> > > >> > by other tenants is a non-issue. Perhaps you can comment on how
> > > >> > Connect is different? In general much of the complexity in the KIP
> > is
> > > >> > related to the separate offset topic, and I'm wondering if this
> can
> > be
> > > >> > avoided. The migration use-case is interesting, but not related to
> > > >> > exactly-once and can be handled separately.
> > > >> > 3. Allowing users to override the isolation level for the offset
> > > >> > reader, even when exactly-once is enabled, thereby disabling
> > > >> > exactly-once in a non-obvious way. I get that connect usually
> allows
> > > >> > users to shoot themselves in the foot, but are there any actual
> > > >> > benefits for allowing it in this case? Maybe it is better if we
> > don't?
> > > >> > I don't find the argument that we always did this to be
> particularly
> > > >> > compelling.
> > > >> > 4. It isn't stated explicitly, but it sounds like connect or
> source
> > > >> > connectors already have some batching mechanism, and that
> > transaction
> > > >> > boundaries will match the batches (i.e. each batch will be a
> > > >> > transaction?). If so, worth being explicit.
> > > >> > 5. "When a rebalance is triggered, before (re-)joining the cluster
> > > >> > group, all workers will preemptively stop all tasks of all source
> > > >> > connectors for which task configurations are present in the config
> > > >> > topic after the latest task count record" - how will this play
> with
> > > >> > the incremental rebalances? isn't this exactly the stop-the-world
> > > >> > rebalance we want to avoid?
> > > >> > 6. "the worker will instantiate a transactional producer whose
> > > >> > transactional ID is, by default, the group ID of the cluster (but
> > may
> > > >> > be overwritten by users using the transactional.id worker
> > property)"
> > > -
> > > >> > If users change transactional.id property, zombie leaders won't
> get
> > > >> > fenced (since they will have an older and different transactional
> > id)
> > > >> >
> > > >> > Thanks,
> > > >> >
> > > >> > Gwen
> > > >> >
> > > >> > On Thu, May 21, 2020 at 11:21 PM Chris Egerton <
> chr...@confluent.io
> > >
> > > >> > wrote:
> > > >> > >
> > > >> > > Hi all,
> > > >> > >
> > > >> > > I know it's a busy time with the upcoming 2.6 release and I
> don't
> > > >> expect
> > > >> > > this to get a lot of traction until that's done, but I've
> > published
> > > a
> > > >> KIP
> > > >> > > for allowing atomic commit of offsets and records for source
> > > >> connectors
> > > >> > and
> > > >> > > would appreciate your feedback:
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets
> > > >> > >
> > > >> > > This feature should make it possible to implement source
> > connectors
> > > >> with
> > > >> > > exactly-once delivery guarantees, and even allow a wide range of
> > > >> existing
> > > >> > > source connectors to provide exactly-once delivery guarantees
> with
> > > no
> > > >> > > changes required.
> > > >> > >
> > > >> > > Cheers,
> > > >> > >
> > > >> > > Chris
> > > >> >
> > > >> >
> > > >> >
> > > >> > --
> > > >> > Gwen Shapira
> > > >> > Engineering Manager | Confluent
> > > >> > 650.450.2760 | @gwenshap
> > > >> > Follow us: Twitter | blog
> > > >> >
> > > >>
> > > >>
> > > >> --
> > > >> -- Guozhang
> > > >>
> > > >
> > >
> >
> >
> > --
> > -- Guozhang
> >
>


-- 
-- Guozhang

Reply via email to