This is a great explanation, thank you! On Mon, Feb 22, 2021 at 2:44 PM Chris Egerton <chr...@confluent.io> wrote:
> Hi Guozhang, > > Your understanding should be correct in most cases, but there are two finer > points that may interest you: > > 1. It's technically dependent on the implementation of the connector and > how it chooses to allocate source partitions across its tasks; even if the > number of tasks and source partitions remains completely unchanged, a > connector may still choose to shuffle around the partition->task > allocation. I can't think of any cases where this might happen off the top > of my head, but it seemed worth sharing given the educational nature of the > question. > 2. It's also possible that the number of source partitions remains > unchanged, but the set of source partitions changes. One case where this > might happen is with a database connector monitoring for tables that match > a given regex every five minutes; if a table that matched that regex during > the last scan got assigned to a task and then dropped, and then another > table that matched the regex got added before the next scan, the connector > would see the same number of tables, but the actual set would be different. > At this point, it would again be connector-dependent for whether the > already-assigned tables stayed assigned to the same tasks. Is anyone else > reminded of the various consumer partition assignment strategies at this > point? > > A general comment I should make here (not necessarily for your benefit but > for anyone following along) is that it's important to keep in mind that > "source partitions" in Kafka Connect aren't Kafka topic partitions (well, > unless your connector is designed to replicate data across Kafka clusters > like MirrorMaker 2). As a result, we have to rely on developer-written code > to a greater extent to define what the source partitions for a source > connector are, and how to divvy up that work amongst tasks. > > Hope this helps! If you have any further questions please hit me with them; > I doubt you'll be the only one wondering about these things. > > Cheers, > > Chris > > On Mon, Feb 22, 2021 at 5:30 PM Guozhang Wang <wangg...@gmail.com> wrote: > > > Thanks Chris, yeah I think I agree with you that this does not > necessarily > > have to be in the scope of this KIP. > > > > My understanding was that the source partitions -> tasks are not static > but > > dynamic, but they are only changed when either the number of partitions > > changed or "tasks.max" config changed (please correct me if I'm wrong), > so > > what I'm thinking that we can try to detect if either of these things > > happens, and if they do not happen we can assume the mapping from > > partitions -> tasks does not change --- of course this requires some > > extension on the API, aligned with what you said. I would like to make > sure > > that my understanding here is correct :) > > > > Guozhang > > > > > > On Mon, Feb 22, 2021 at 11:29 AM Chris Egerton <chr...@confluent.io> > > wrote: > > > > > Hi Guozhang, > > > > > > Thanks for taking a look, and for your suggestion! > > > > > > I think there is room for more intelligent fencing strategies, but I > > think > > > that it'd have to be more nuanced than one based on task->worker > > > assignments. Connectors can arbitrarily reassign source partitions > (such > > as > > > database tables, or Kafka topic partitions) across tasks, so even if > the > > > assignment of tasks across workers remains unchanged, the assignment of > > > source partitions across those workers might. Connect doesn't do any > > > inspection of task configurations at the moment, and without expansions > > to > > > the Connector/Task API, it'd likely be impossible to get information > from > > > tasks about their source partition assignments. With that in mind, I > > think > > > we may want to leave the door open for more intelligent task fencing > but > > > not include that level of optimization at this stage. Does that sound > > fair > > > to you? > > > > > > There is one case that I've identified where we can cheaply optimize > > right > > > now: single-task connectors, such as the Debezium CDC source > connectors. > > If > > > a connector is configured at some point with a single task, then some > > other > > > part of its configuration is altered but the single-task aspect > remains, > > > the leader doesn't have to worry about fencing out the older task as > the > > > new task's producer will do that automatically. In this case, the > leader > > > can skip the producer fencing round and just write the new task count > > > record straight to the config topic. I've added this case to the KIP; > if > > it > > > overcomplicates things I'm happy to remove it, but seeing as it serves > a > > > real use case and comes fairly cheap, I figured it'd be best to include > > > now. > > > > > > Thanks again for your feedback; if you have other thoughts I'd love to > > hear > > > them! > > > > > > Cheers, > > > > > > Chris > > > > > > On Mon, Feb 22, 2021 at 1:57 PM Chris Egerton <chr...@confluent.io> > > wrote: > > > > > > > Hi Gwen, > > > > > > > > Thanks for the feedback! > > > > > > > > 0. > > > > That's a great point; I've updated the motivation section with that > > > > rationale. > > > > > > > > 1. > > > > This enables safe "hard downgrades" of clusters where, instead of > just > > > > disabling exactly-once support on each worker, each worker is rolled > > back > > > > to an earlier version of the Connect framework that doesn't support > > > > per-connector offsets topics altogether. Those workers would go back > to > > > all > > > > using a global offsets topic, and any offsets stored in per-connector > > > > topics would be lost to those workers. This would cause a large > number > > of > > > > duplicates to flood the downstream system. While technically > > permissible > > > > given that the user in this case will have knowingly switched to a > > > version > > > > of the Connect framework that doesn't support exactly-once source > > > > connectors (and is therefore susceptible to duplicate delivery of > > > records), > > > > the user experience in this case could be pretty bad. A similar > > situation > > > > is if users switch back from per-connector offsets topics to the > global > > > > offsets topic. > > > > I've tried to make this more clear in the KIP by linking to the "Hard > > > > downgrade" section from the proposed design, and by expanding on the > > > > rationale provided for redundant global offset writes in the "Hard > > > > downgrade" section. Let me know if you think this could be improved > or > > > > think a different approach is warranted. > > > > > > > > 2. > > > > I think the biggest difference between Connect and Streams comes from > > the > > > > fact that Connect allows users to create connectors that target > > different > > > > Kafka clusters on the same worker. This hasn't been a problem in the > > past > > > > because workers use two separate producers to write offset data and > > > source > > > > connector records, but in order to write offsets and records in the > > same > > > > transaction, it becomes necessary to use a single producer, which > also > > > > requires that the internal offsets topic be hosted on the same Kafka > > > > cluster that the connector is targeting. > > > > This may sound like a niche use case but it was actually one of the > > > > driving factors behind KIP-458 ( > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy > > > ), > > > > and it's a feature that we rely on heavily today. > > > > If there's too much going on in this KIP and we'd prefer to drop > > support > > > > for running that type of setup with exactly-once source connectors > for > > > now, > > > > I can propose this in a separate KIP. I figured it'd be best to get > > this > > > > out of the way with the initial introduction of exactly-once source > > > support > > > > in order to make adoption by existing Connect users as seamless as > > > > possible, and since we'd likely have to address this issue before > being > > > > able to utilize the feature ourselves. > > > > I switched around the ordering of the "motivation" section for > > > > per-connector offsets topics to put the biggest factor first, and > > called > > > it > > > > out as the major difference between Connect and Streams in this case. > > > > > > > > 3. > > > > Fair enough, after giving it a little more thought I agree that > > allowing > > > > users to shoot themselves in the foot is a bad idea here. There's > also > > > some > > > > precedent for handling this with the "enable.idempotence" and " > > > > transactional.id" producer properties; if you specify a > transactional > > ID > > > > but don't specify a value for idempotence, the producer just does the > > > right > > > > thing for you by enabling idempotence and emitting a log message > > letting > > > > you know that it's done so. I've adjusted the proposed behavior to > try > > to > > > > use a similar approach; let me know what you think. > > > > There is the potential gap here where, sometime in the future, a > third > > > > accepted value for the "isolation.level" property is added to the > > > consumer > > > > API and users will be unable to use that new value for their worker. > > But > > > > the likelihood of footgunning seems much greater than this scenario, > > and > > > we > > > > can always address expansions to the consumer API with changes to the > > > > Connect framework as well if/when that becomes necessary. > > > > I've also added a similar note to the source task's transactional ID > > > > property; user overrides of it will also be disabled. > > > > > > > > 4. > > > > Yeah, that's mostly correct. I tried to touch on this in the > > "Motivation" > > > > section with this bit: > > > > > The Connect framework periodically writes source task offsets > to > > an > > > > internal Kafka topic at a configurable interval, once the source > > records > > > > that they correspond to have been successfully sent to Kafka. > > > > I've expanded on this in the "Offset (and record) writes" section, > and > > > > I've tweaked the "Motivation" section a little bit to add a link to > the > > > > relevant config property and to make the language a little more > > accurate. > > > > > > > > 5. > > > > This isn't quite as bad as stop-the-world; more like > > stop-the-connector. > > > > If a worker is running a dozen connectors and one of those (that > > happens > > > to > > > > be a source) is reconfigured, only the tasks for that connector will > be > > > > preemptively halted, and all other tasks and connectors will continue > > > > running. This is pretty close to the current behavior with > incremental > > > > rebalancing; the only difference is that, instead of waiting for the > > > > rebalance to complete before halting the tasks for that connector, > the > > > > worker will halt them in preparation for the rebalance. This > increases > > > the > > > > time that the tasks will be down for, but is necessary if we want to > > give > > > > tasks time to gracefully shut down before being fenced out by the > > leader, > > > > and shouldn't impact availability too much as it's only triggered on > > > > connector reconfiguration and rebalances generally don't take that > long > > > > with the new incremental protocol. > > > > > > > > 6. > > > > Ah, thanks for the catch! That's a good point. I've adjusted the > > proposal > > > > to treat the worker's transactional ID like the consumer's isolation > > > level > > > > and the source task's transactional ID properties; can't be modified > by > > > > users and, if an attempt is made to do so, will be discarded after > > > logging > > > > a message to that effect. More footguns removed, hooray! > > > > > > > > Thanks again for taking a look; if you have any other questions or > > > > suggestions I'm all ears. > > > > > > > > Cheers, > > > > > > > > Chris > > > > > > > > p.s. - Guozhang--I saw your email; response to you coming next :) > > > > > > > > On Sun, Feb 21, 2021 at 4:12 PM Guozhang Wang <wangg...@gmail.com> > > > wrote: > > > > > > > >> Hello Chris, > > > >> > > > >> Thanks for the great write-up! I was mainly reviewing the admin > > > >> fenceProducers API of the KIP. I think it makes sense overall. I'm > > just > > > >> wondering if we can go one step further, that instead of forcing to > > > fence > > > >> out all producers of the previous generation, could we try to > achieve > > > >> "partial rebalance" still by first generate the new assignment, and > > then > > > >> based on the new assignment only fence out producers involved in > tasks > > > >> that > > > >> are indeed migrated? Just a wild thought to bring up for debate. > > > >> > > > >> Guozhang > > > >> > > > >> > > > >> > > > >> On Sat, Feb 20, 2021 at 10:20 PM Gwen Shapira <g...@confluent.io> > > > wrote: > > > >> > > > >> > Hey Chris, > > > >> > > > > >> > Thank you for the proposal. Few questions / comments: > > > >> > > > > >> > 0. It may be worth pointing out in the motivation section that > > > >> > source-connector exactly once is more important than sink > connector > > > >> > exactly once, since many target systems will have unique key > > > >> > constraint mechanisms that will prevent duplicates. Kafka does not > > > >> > have any such constraints, so without this KIP-618, exactly once > > won't > > > >> > be possible. > > > >> > 1. I am not clear why we need the worker to async copy offsets > from > > > >> > the connector-specific offset topic to a global offsets topic > > > >> > 2. While the reasoning you have for offset topic per connector > > appears > > > >> > sound, it doesn't add up with the use of transactions in > > KafkaStreams. > > > >> > My understanding is that KafkaStreams uses shared offsets topic > with > > > >> > all the other consumers, and (apparently) corrupting data and > delays > > > >> > by other tenants is a non-issue. Perhaps you can comment on how > > > >> > Connect is different? In general much of the complexity in the KIP > > is > > > >> > related to the separate offset topic, and I'm wondering if this > can > > be > > > >> > avoided. The migration use-case is interesting, but not related to > > > >> > exactly-once and can be handled separately. > > > >> > 3. Allowing users to override the isolation level for the offset > > > >> > reader, even when exactly-once is enabled, thereby disabling > > > >> > exactly-once in a non-obvious way. I get that connect usually > allows > > > >> > users to shoot themselves in the foot, but are there any actual > > > >> > benefits for allowing it in this case? Maybe it is better if we > > don't? > > > >> > I don't find the argument that we always did this to be > particularly > > > >> > compelling. > > > >> > 4. It isn't stated explicitly, but it sounds like connect or > source > > > >> > connectors already have some batching mechanism, and that > > transaction > > > >> > boundaries will match the batches (i.e. each batch will be a > > > >> > transaction?). If so, worth being explicit. > > > >> > 5. "When a rebalance is triggered, before (re-)joining the cluster > > > >> > group, all workers will preemptively stop all tasks of all source > > > >> > connectors for which task configurations are present in the config > > > >> > topic after the latest task count record" - how will this play > with > > > >> > the incremental rebalances? isn't this exactly the stop-the-world > > > >> > rebalance we want to avoid? > > > >> > 6. "the worker will instantiate a transactional producer whose > > > >> > transactional ID is, by default, the group ID of the cluster (but > > may > > > >> > be overwritten by users using the transactional.id worker > > property)" > > > - > > > >> > If users change transactional.id property, zombie leaders won't > get > > > >> > fenced (since they will have an older and different transactional > > id) > > > >> > > > > >> > Thanks, > > > >> > > > > >> > Gwen > > > >> > > > > >> > On Thu, May 21, 2020 at 11:21 PM Chris Egerton < > chr...@confluent.io > > > > > > >> > wrote: > > > >> > > > > > >> > > Hi all, > > > >> > > > > > >> > > I know it's a busy time with the upcoming 2.6 release and I > don't > > > >> expect > > > >> > > this to get a lot of traction until that's done, but I've > > published > > > a > > > >> KIP > > > >> > > for allowing atomic commit of offsets and records for source > > > >> connectors > > > >> > and > > > >> > > would appreciate your feedback: > > > >> > > > > > >> > > > > >> > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Atomic+commit+of+source+connector+records+and+offsets > > > >> > > > > > >> > > This feature should make it possible to implement source > > connectors > > > >> with > > > >> > > exactly-once delivery guarantees, and even allow a wide range of > > > >> existing > > > >> > > source connectors to provide exactly-once delivery guarantees > with > > > no > > > >> > > changes required. > > > >> > > > > > >> > > Cheers, > > > >> > > > > > >> > > Chris > > > >> > > > > >> > > > > >> > > > > >> > -- > > > >> > Gwen Shapira > > > >> > Engineering Manager | Confluent > > > >> > 650.450.2760 | @gwenshap > > > >> > Follow us: Twitter | blog > > > >> > > > > >> > > > >> > > > >> -- > > > >> -- Guozhang > > > >> > > > > > > > > > > > > > -- > > -- Guozhang > > > -- -- Guozhang