Hi Chris, Thanks for clarifying, I had missed that update in the KIP (the bit about altering/resetting offsets response). I think your arguments for not going with an additional method or a custom return type make sense.
Thanks, Yash On Sat, Dec 10, 2022 at 12:28 AM Chris Egerton <chr...@aiven.io.invalid> wrote: > Hi Yash, > > The idea with the boolean is to just signify that a connector has > overridden this method, which allows us to issue a definitive response in > the REST API when servicing offset alter/reset requests (described more in > detail here: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Altering/resettingoffsets(response) > ). > > One alternative could be to add some kind of OffsetAlterResult return type > for this method with constructors like "OffsetAlterResult.success()", > "OffsetAlterResult.unknown()" (default), and > "OffsetAlterResult.failure(Throwable t)", but it's hard to envision a > reason to use the "failure" static factory method instead of just throwing > an exception, at which point, we're only left with two reasonable methods: > success, and unknown. > > Another could be to add a "boolean canResetOffsets()" method to the > SourceConnector class, but adding a separate method seems like overkill, > and developers may not understand that implementing that method to always > return "false" won't actually cause offset reset requests to not take > place. > > One final option could be to have something like an AlterOffsetsSupport > enum with values SUPPORTED and UNSUPPORTED and a new "AlterOffsetsSupport > alterOffsetsSupport()" SourceConnector method that returns null by default > (which implicitly maps to the "unknown support" response message in the > REST API). This would line up with the ExactlyOnceSupport API we added in > KIP-618. However, I'm hesitant to adopt it because it's not strictly > necessary to address the cases that we want to cover; everything can be > handled with a single method that gets invoked on offset alter/reset > requests. With exactly-once support, we added this hook because it was > designed to be invoked in a different point in the connector lifecycle. > > Cheers, > > Chris > > On Wed, Dec 7, 2022 at 9:46 AM Yash Mayya <yash.ma...@gmail.com> wrote: > > > Hi Chris, > > > > Sorry for the late reply. > > > > > I don't believe logging an error message is sufficient for > > > handling failures to reset-after-delete. IMO it's highly > > > likely that users will either shoot themselves in the foot > > > by not reading the fine print and realizing that the offset > > > request may have failed, or will ask for better visibility > > > into the success or failure of the reset request than > > > scanning log files. > > > > Your reasoning for deferring the reset offsets after delete functionality > > to a separate KIP makes sense, thanks for the explanation. > > > > > I've updated the KIP with the > > > developer-facing API changes for this logic > > > > This is great, I hadn't considered the other two (very valid) use-cases > for > > such an API, thanks for adding these with elaborate documentation! > However, > > the significance / use of the boolean value returned by the two methods > is > > not fully clear, could you please clarify? > > > > Thanks, > > Yash > > > > On Fri, Nov 18, 2022 at 1:06 AM Chris Egerton <chr...@aiven.io.invalid> > > wrote: > > > > > Hi Yash, > > > > > > I've updated the KIP with the correct "kafka_topic", "kafka_partition", > > and > > > "kafka_offset" keys in the JSON examples (settled on those instead of > > > prefixing with "Kafka " for better interactions with tooling like JQ). > > I've > > > also added a note about sink offset requests failing if there are still > > > active members in the consumer group. > > > > > > I don't believe logging an error message is sufficient for handling > > > failures to reset-after-delete. IMO it's highly likely that users will > > > either shoot themselves in the foot by not reading the fine print and > > > realizing that the offset request may have failed, or will ask for > better > > > visibility into the success or failure of the reset request than > scanning > > > log files. I don't doubt that there are ways to address this, but I > would > > > prefer to leave them to a separate KIP since the required design work > is > > > non-trivial and I do not feel that the added burden is worth tying to > > this > > > KIP as a blocker. > > > > > > I was really hoping to avoid introducing a change to the > developer-facing > > > APIs with this KIP, but after giving it some thought I think this may > be > > > unavoidable. It's debatable whether validation of altered offsets is a > > good > > > enough use case on its own for this kind of API, but since there are > also > > > connectors out there that manage offsets externally, we should probably > > add > > > a hook to allow those external offsets to be managed, which can then > > serve > > > double- or even-triple duty as a hook to validate custom offsets and to > > > notify users whether offset resets/alterations are supported at all > > (which > > > they may not be if, for example, offsets are coupled tightly with the > > data > > > written by a sink connector). I've updated the KIP with the > > > developer-facing API changes for this logic; let me know what you > think. > > > > > > Cheers, > > > > > > Chris > > > > > > On Mon, Nov 14, 2022 at 10:16 AM Mickael Maison < > > mickael.mai...@gmail.com> > > > wrote: > > > > > > > Hi Chris, > > > > > > > > Thanks for the update! > > > > > > > > It's relatively common to only want to reset offsets for a specific > > > > resource (for example with MirrorMaker for one or a group of topics). > > > > Could it be possible to add a way to do so? Either by providing a > > > > payload to DELETE or by setting the offset field to an empty object > in > > > > the PATCH payload? > > > > > > > > Thanks, > > > > Mickael > > > > > > > > On Sat, Nov 12, 2022 at 3:33 PM Yash Mayya <yash.ma...@gmail.com> > > wrote: > > > > > > > > > > Hi Chris, > > > > > > > > > > Thanks for pointing out that the consumer group deletion step > itself > > > will > > > > > fail in case of zombie sink tasks. Since we can't get any stronger > > > > > guarantees from consumers (unlike with transactional producers), I > > > think > > > > it > > > > > makes perfect sense to fail the offset reset attempt in such > > scenarios > > > > with > > > > > a relevant error message to the user. I was more concerned about > > > silently > > > > > failing but it looks like that won't be an issue. It's probably > worth > > > > > calling out this difference between source / sink connectors > > explicitly > > > > in > > > > > the KIP, what do you think? > > > > > > > > > > > changing the field names for sink offsets > > > > > > from "topic", "partition", and "offset" to "Kafka > > > > > > topic", "Kafka partition", and "Kafka offset" respectively, to > > > > > > reduce the stuttering effect of having a "partition" field inside > > > > > > a "partition" field and the same with an "offset" field > > > > > > > > > > The KIP is still using the nested partition / offset fields by the > > way > > > - > > > > > has it not been updated because we're waiting for consensus on the > > > field > > > > > names? > > > > > > > > > > > The reset-after-delete feature, on the other > > > > > > hand, is actually pretty tricky to design; I've updated the > > > > > > rationale in the KIP for delaying it and clarified that it's not > > > > > > just a matter of implementation but also design work. > > > > > > > > > > I like the idea of writing an offset reset request to the config > > topic > > > > > which will be processed by the herder's config update listener - > I'm > > > not > > > > > sure I fully follow the concerns with regard to handling failures? > > Why > > > > > can't we simply log an error saying that the offset reset for the > > > deleted > > > > > connector "xyz" failed due to reason "abc"? As long as it's > > documented > > > > that > > > > > connector deletion and offset resets are asynchronous and a success > > > > > response only means that the request was initiated successfully > > (which > > > is > > > > > the case even today with normal connector deletion), we should be > > fine > > > > > right? > > > > > > > > > > Thanks for adding the new PATCH endpoint to the KIP, I think it's a > > lot > > > > > more useful for this use case than a PUT endpoint would be! One > thing > > > > > that I was thinking about with the new PATCH endpoint is that while > > we > > > > can > > > > > easily validate the request body format for sink connectors (since > > it's > > > > the > > > > > same across all connectors), we can't do the same for source > > connectors > > > > as > > > > > things stand today since each source connector implementation can > > > define > > > > > its own source partition and offset structures. Without any > > validation, > > > > > writing a bad offset for a source connector via the PATCH endpoint > > > could > > > > > cause it to fail with hard to discern errors. I'm wondering if we > > could > > > > add > > > > > a new method to the `SourceConnector` class (which should be > > overridden > > > > by > > > > > source connector implementations) that would validate whether or > not > > > the > > > > > provided source partitions and source offsets are valid for the > > > connector > > > > > (it could have a default implementation returning true > > unconditionally > > > > for > > > > > backward compatibility). > > > > > > > > > > > I've also added an implementation plan to the KIP, which calls > > > > > > out the different parts that can be worked on independently so > that > > > > > > others (hi Yash 🙂) can also tackle parts of this if they'd like. > > > > > > > > > > I'd be more than happy to pick up one or more of the implementation > > > > parts, > > > > > thanks for breaking it up into granular pieces! > > > > > > > > > > Thanks, > > > > > Yash > > > > > > > > > > On Fri, Nov 11, 2022 at 11:25 PM Chris Egerton > > <chr...@aiven.io.invalid > > > > > > > > > wrote: > > > > > > > > > > > Hi Mickael, > > > > > > > > > > > > Thanks for your feedback. This has been on my TODO list as well > :) > > > > > > > > > > > > 1. That's fair! Support for altering offsets is easy enough to > > > design, > > > > so > > > > > > I've added it to the KIP. The reset-after-delete feature, on the > > > other > > > > > > hand, is actually pretty tricky to design; I've updated the > > rationale > > > > in > > > > > > the KIP for delaying it and clarified that it's not just a matter > > of > > > > > > implementation but also design work. If you or anyone else can > > think > > > > of a > > > > > > clean, simple way to implement it, I'm happy to add it to this > KIP, > > > but > > > > > > otherwise I'd prefer not to tie it to the approval and release of > > the > > > > > > features already proposed in the KIP. > > > > > > > > > > > > 2. Yeah, it's a little awkward. In my head I've justified the > > > ugliness > > > > of > > > > > > the implementation with the smooth user-facing experience; > falling > > > back > > > > > > seamlessly on the PAUSED state without even logging an error > > message > > > > is a > > > > > > lot better than I'd initially hoped for when I was designing this > > > > feature. > > > > > > > > > > > > I've also added an implementation plan to the KIP, which calls > out > > > the > > > > > > different parts that can be worked on independently so that > others > > > (hi > > > > Yash > > > > > > 🙂) can also tackle parts of this if they'd like. > > > > > > > > > > > > Finally, I've removed the "type" field from the response body > > format > > > > for > > > > > > offset read requests. This way, users can copy+paste the response > > > from > > > > that > > > > > > endpoint into a request to alter a connector's offsets without > > having > > > > to > > > > > > remove the "type" field first. An alternative was to keep the > > "type" > > > > field > > > > > > and add it to the request body format for altering offsets, but > > this > > > > didn't > > > > > > seem to make enough sense for cases not involving the > > aforementioned > > > > > > copy+paste process. > > > > > > > > > > > > Cheers, > > > > > > > > > > > > Chris > > > > > > > > > > > > On Wed, Nov 9, 2022 at 9:57 AM Mickael Maison < > > > > mickael.mai...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > Thanks for the KIP, you're picking something that has been in > my > > > todo > > > > > > > list for a while ;) > > > > > > > > > > > > > > It looks good overall, I just have a couple of questions: > > > > > > > 1) I consider both features listed in Future Work pretty > > important. > > > > In > > > > > > > both cases you mention the reason for not addressing them now > is > > > > > > > because of the implementation. If the design is simple and if > we > > > have > > > > > > > volunteers to implement them, I wonder if we could include them > > in > > > > > > > this KIP. So you would not have to implement everything but we > > > would > > > > > > > have a single KIP and vote. > > > > > > > > > > > > > > 2) Regarding the backward compatibility for the stopped state. > > The > > > > > > > "state.v2" field is a bit unfortunate but I can't think of a > > better > > > > > > > solution. The other alternative would be to not do anything > but I > > > > > > > think the graceful degradation you propose is a bit better. > > > > > > > > > > > > > > Thanks, > > > > > > > Mickael > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Tue, Nov 8, 2022 at 5:58 PM Chris Egerton > > > <chr...@aiven.io.invalid > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > Good question! This is actually a subtle source of asymmetry > in > > > the > > > > > > > current > > > > > > > > proposal. Requests to delete a consumer group with active > > members > > > > will > > > > > > > > fail, so if there are zombie sink tasks that are still > > > > communicating > > > > > > with > > > > > > > > Kafka, offset reset requests for that connector will also > fail. > > > It > > > > is > > > > > > > > possible to use an admin client to remove all active members > > from > > > > the > > > > > > > group > > > > > > > > and then delete the group. However, this solution isn't as > > > > complete as > > > > > > > the > > > > > > > > zombie fencing that we can perform for exactly-once source > > tasks, > > > > since > > > > > > > > removing consumers from a group doesn't prevent them from > > > > immediately > > > > > > > > rejoining the group, which would either cause the group > > deletion > > > > > > request > > > > > > > to > > > > > > > > fail (if they rejoin before the group is deleted), or > recreate > > > the > > > > > > group > > > > > > > > (if they rejoin after the group is deleted). > > > > > > > > > > > > > > > > For ease of implementation, I'd prefer to leave the asymmetry > > in > > > > the > > > > > > API > > > > > > > > for now and fail fast and clearly if there are still > consumers > > > > active > > > > > > in > > > > > > > > the sink connector's group. We can try to detect this case > and > > > > provide > > > > > > a > > > > > > > > helpful error message to the user explaining why the offset > > reset > > > > > > request > > > > > > > > has failed and some steps they can take to try to resolve > > things > > > > (wait > > > > > > > for > > > > > > > > slow task shutdown to complete, restart zombie workers and/or > > > > workers > > > > > > > with > > > > > > > > blocked tasks on them). In the future we can possibly even > > > revisit > > > > > > > KIP-611 > > > > > > > > [1] or something like it to provide better insight into > zombie > > > > tasks > > > > > > on a > > > > > > > > worker so that it's easier to find which tasks have been > > > abandoned > > > > but > > > > > > > are > > > > > > > > still running. > > > > > > > > > > > > > > > > Let me know what you think; this is an important point to > call > > > out > > > > and > > > > > > if > > > > > > > > we can reach some consensus on how to handle sink connector > > > offset > > > > > > resets > > > > > > > > w/r/t zombie tasks, I'll update the KIP with the details. > > > > > > > > > > > > > > > > [1] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > On Tue, Nov 8, 2022 at 8:00 AM Yash Mayya < > > yash.ma...@gmail.com> > > > > > > wrote: > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > Thanks for the response and the explanations, I think > you've > > > > answered > > > > > > > > > pretty much all the questions I had meticulously! > > > > > > > > > > > > > > > > > > > > > > > > > > > > if something goes wrong while resetting offsets, there's > no > > > > > > > > > > immediate impact--the connector will still be in the > > STOPPED > > > > > > > > > > state. The REST response for requests to reset the > offsets > > > > > > > > > > will clearly call out that the operation has failed, and > if > > > > > > > necessary, > > > > > > > > > > we can probably also add a scary-looking warning message > > > > > > > > > > stating that we can't guarantee which offsets have been > > > > > > successfully > > > > > > > > > > wiped and which haven't. Users can query the exact > offsets > > > of > > > > > > > > > > the connector at this point to determine what will happen > > > > if/what > > > > > > > they > > > > > > > > > > resume it. And they can repeat attempts to reset the > > offsets > > > as > > > > > > many > > > > > > > > > > times as they'd like until they get back a 2XX response, > > > > > > indicating > > > > > > > > > > that it's finally safe to resume the connector. Thoughts? > > > > > > > > > > > > > > > > > > Yeah, I agree, the case that I mentioned earlier where a > user > > > > would > > > > > > > try to > > > > > > > > > resume a stopped connector after a failed offset reset > > attempt > > > > > > without > > > > > > > > > knowing that the offset reset attempt didn't fail cleanly > is > > > > probably > > > > > > > just > > > > > > > > > an extreme edge case. I think as long as the response is > > > verbose > > > > > > > enough and > > > > > > > > > self explanatory, we should be fine. > > > > > > > > > > > > > > > > > > Another question that I had was behavior w.r.t sink > connector > > > > offset > > > > > > > resets > > > > > > > > > when there are zombie tasks/workers in the Connect cluster > - > > > the > > > > KIP > > > > > > > > > mentions that for sink connectors offset resets will be > done > > by > > > > > > > deleting > > > > > > > > > the consumer group. However, if there are zombie tasks > which > > > are > > > > > > still > > > > > > > able > > > > > > > > > to communicate with the Kafka cluster that the sink > connector > > > is > > > > > > > consuming > > > > > > > > > from, I think the consumer group will automatically get > > > > re-created > > > > > > and > > > > > > > the > > > > > > > > > zombie task may be able to commit offsets for the > partitions > > > > that it > > > > > > is > > > > > > > > > consuming from? > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 4, 2022 at 10:27 PM Chris Egerton > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > > > > > Thanks again for your thoughts! Responses to ongoing > > > > discussions > > > > > > > inline > > > > > > > > > > (easier to track context than referencing comment > numbers): > > > > > > > > > > > > > > > > > > > > > However, this then leads me to wonder if we can make > that > > > > > > explicit > > > > > > > by > > > > > > > > > > including "connect" or "connector" in the higher level > > field > > > > names? > > > > > > > Or do > > > > > > > > > > you think this isn't required given that we're talking > > about > > > a > > > > > > > Connect > > > > > > > > > > specific REST API in the first place? > > > > > > > > > > > > > > > > > > > > I think "partition" and "offset" are fine as field names > > but > > > > I'm > > > > > > not > > > > > > > > > hugely > > > > > > > > > > opposed to adding "connector " as a prefix to them; would > > be > > > > > > > interested > > > > > > > > > in > > > > > > > > > > others' thoughts. > > > > > > > > > > > > > > > > > > > > > I'm not sure I followed why the unresolved writes to > the > > > > config > > > > > > > topic > > > > > > > > > > would be an issue - wouldn't the delete offsets request > be > > > > added to > > > > > > > the > > > > > > > > > > herder's request queue and whenever it is processed, we'd > > > > anyway > > > > > > > need to > > > > > > > > > > check if all the prerequisites for the request are > > satisfied? > > > > > > > > > > > > > > > > > > > > Some requests are handled in multiple steps. For example, > > > > deleting > > > > > > a > > > > > > > > > > connector (1) adds a request to the herder queue to > write a > > > > > > > tombstone to > > > > > > > > > > the config topic (or, if the worker isn't the leader, > > forward > > > > the > > > > > > > request > > > > > > > > > > to the leader). (2) Once that tombstone is picked up, > (3) a > > > > > > rebalance > > > > > > > > > > ensues, and then after it's finally complete, (4) the > > > > connector and > > > > > > > its > > > > > > > > > > tasks are shut down. I probably could have used better > > > > terminology, > > > > > > > but > > > > > > > > > > what I meant by "unresolved writes to the config topic" > > was a > > > > case > > > > > > in > > > > > > > > > > between steps (2) and (3)--where the worker has already > > read > > > > that > > > > > > > > > tombstone > > > > > > > > > > from the config topic and knows that a rebalance is > > pending, > > > > but > > > > > > > hasn't > > > > > > > > > > begun participating in that rebalance yet. In the > > > > DistributedHerder > > > > > > > > > class, > > > > > > > > > > this is done via the `checkRebalanceNeeded` method. > > > > > > > > > > > > > > > > > > > > > We can probably revisit this potential deprecation [of > > the > > > > PAUSED > > > > > > > > > state] > > > > > > > > > > in the future based on user feedback and how the adoption > > of > > > > the > > > > > > new > > > > > > > > > > proposed stop endpoint looks like, what do you think? > > > > > > > > > > > > > > > > > > > > Yeah, revisiting in the future seems reasonable. 👍 > > > > > > > > > > > > > > > > > > > > And responses to new comments here: > > > > > > > > > > > > > > > > > > > > 8. Yep, we'll start tracking offsets by connector. I > don't > > > > believe > > > > > > > this > > > > > > > > > > should be too difficult, and suspect that the only reason > > we > > > > track > > > > > > > raw > > > > > > > > > byte > > > > > > > > > > arrays instead of pre-deserializing offset topic > > information > > > > into > > > > > > > > > something > > > > > > > > > > more useful is because Connect originally had pluggable > > > > internal > > > > > > > > > > converters. Now that we're hardcoded to use the JSON > > > converter > > > > it > > > > > > > should > > > > > > > > > be > > > > > > > > > > fine to track offsets on a per-connector basis as they're > > > read > > > > from > > > > > > > the > > > > > > > > > > offsets topic. > > > > > > > > > > > > > > > > > > > > 9. I'm hesitant to introduce this type of feature right > now > > > > because > > > > > > > of > > > > > > > > > all > > > > > > > > > > of the gotchas that would come with it. In > > security-conscious > > > > > > > > > environments, > > > > > > > > > > it's possible that a sink connector's principal may have > > > > access to > > > > > > > the > > > > > > > > > > consumer group used by the connector, but the worker's > > > > principal > > > > > > may > > > > > > > not. > > > > > > > > > > There's also the case where source connectors have > separate > > > > offsets > > > > > > > > > topics, > > > > > > > > > > or sink connectors have overridden consumer group IDs, or > > > sink > > > > or > > > > > > > source > > > > > > > > > > connectors work against a different Kafka cluster than > the > > > one > > > > that > > > > > > > their > > > > > > > > > > worker uses. Overall, I'd rather provide a single API > that > > > > works in > > > > > > > all > > > > > > > > > > cases rather than risk confusing and alienating users by > > > > trying to > > > > > > > make > > > > > > > > > > their lives easier in a subset of cases. > > > > > > > > > > > > > > > > > > > > 10. Hmm... I don't think the order of the writes matters > > too > > > > much > > > > > > > here, > > > > > > > > > but > > > > > > > > > > we probably could start by deleting from the global topic > > > > first, > > > > > > > that's > > > > > > > > > > true. The reason I'm not hugely concerned about this case > > is > > > > that > > > > > > if > > > > > > > > > > something goes wrong while resetting offsets, there's no > > > > immediate > > > > > > > > > > impact--the connector will still be in the STOPPED state. > > The > > > > REST > > > > > > > > > response > > > > > > > > > > for requests to reset the offsets will clearly call out > > that > > > > the > > > > > > > > > operation > > > > > > > > > > has failed, and if necessary, we can probably also add a > > > > > > > scary-looking > > > > > > > > > > warning message stating that we can't guarantee which > > offsets > > > > have > > > > > > > been > > > > > > > > > > successfully wiped and which haven't. Users can query the > > > exact > > > > > > > offsets > > > > > > > > > of > > > > > > > > > > the connector at this point to determine what will happen > > > > if/what > > > > > > > they > > > > > > > > > > resume it. And they can repeat attempts to reset the > > offsets > > > as > > > > > > many > > > > > > > > > times > > > > > > > > > > as they'd like until they get back a 2XX response, > > indicating > > > > that > > > > > > > it's > > > > > > > > > > finally safe to resume the connector. Thoughts? > > > > > > > > > > > > > > > > > > > > 11. I haven't thought too much about it. I think > something > > > > like the > > > > > > > > > > Monitorable* connectors would probably serve our needs > > here; > > > > we can > > > > > > > > > > instantiate them on a running Connect cluster and then > use > > > > various > > > > > > > > > handles > > > > > > > > > > to know how many times they've been polled, committed > > > records, > > > > etc. > > > > > > > If > > > > > > > > > > necessary we can tweak those classes or even write our > own. > > > But > > > > > > > anyways, > > > > > > > > > > once that's all done, the test will be something like > > > "create a > > > > > > > > > connector, > > > > > > > > > > wait for it to produce N records (each of which contains > > some > > > > kind > > > > > > of > > > > > > > > > > predictable offset), and ensure that the offsets for it > in > > > the > > > > REST > > > > > > > API > > > > > > > > > > match up with the ones we'd expect from N records". Does > > that > > > > > > answer > > > > > > > your > > > > > > > > > > question? > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > On Tue, Oct 18, 2022 at 3:28 AM Yash Mayya < > > > > yash.ma...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > > > > > 1. Thanks a lot for elaborating on this, I'm now > > convinced > > > > about > > > > > > > the > > > > > > > > > > > usefulness of the new offset reset endpoint. Regarding > > the > > > > > > > follow-up > > > > > > > > > KIP > > > > > > > > > > > for a fine-grained offset write API, I'd be happy to > take > > > > that on > > > > > > > once > > > > > > > > > > this > > > > > > > > > > > KIP is finalized and I will definitely look forward to > > your > > > > > > > feedback on > > > > > > > > > > > that one! > > > > > > > > > > > > > > > > > > > > > > 2. Gotcha, the motivation makes more sense to me now. > So > > > the > > > > > > higher > > > > > > > > > level > > > > > > > > > > > partition field represents a Connect specific "logical > > > > partition" > > > > > > > of > > > > > > > > > > sorts > > > > > > > > > > > - i.e. the source partition as defined by a connector > for > > > > source > > > > > > > > > > connectors > > > > > > > > > > > and a Kafka topic + partition for sink connectors. I > like > > > the > > > > > > idea > > > > > > > of > > > > > > > > > > > adding a Kafka prefix to the lower level > partition/offset > > > > (and > > > > > > > topic) > > > > > > > > > > > fields which basically makes it more clear (although > > > > implicitly) > > > > > > > that > > > > > > > > > the > > > > > > > > > > > higher level partition/offset field is Connect specific > > and > > > > not > > > > > > the > > > > > > > > > same > > > > > > > > > > as > > > > > > > > > > > what those terms represent in Kafka itself. However, > this > > > > then > > > > > > > leads me > > > > > > > > > > to > > > > > > > > > > > wonder if we can make that explicit by including > > "connect" > > > or > > > > > > > > > "connector" > > > > > > > > > > > in the higher level field names? Or do you think this > > isn't > > > > > > > required > > > > > > > > > > given > > > > > > > > > > > that we're talking about a Connect specific REST API in > > the > > > > first > > > > > > > > > place? > > > > > > > > > > > > > > > > > > > > > > 3. Thanks, I think the response structure definitely > > looks > > > > better > > > > > > > now! > > > > > > > > > > > > > > > > > > > > > > 4. Interesting, I'd be curious to learn why we might > want > > > to > > > > > > change > > > > > > > > > this > > > > > > > > > > in > > > > > > > > > > > the future but that's probably out of scope for this > > > > discussion. > > > > > > > I'm > > > > > > > > > not > > > > > > > > > > > sure I followed why the unresolved writes to the config > > > topic > > > > > > > would be > > > > > > > > > an > > > > > > > > > > > issue - wouldn't the delete offsets request be added to > > the > > > > > > > herder's > > > > > > > > > > > request queue and whenever it is processed, we'd anyway > > > need > > > > to > > > > > > > check > > > > > > > > > if > > > > > > > > > > > all the prerequisites for the request are satisfied? > > > > > > > > > > > > > > > > > > > > > > 5. Thanks for elaborating that just fencing out the > > > producer > > > > > > still > > > > > > > > > leaves > > > > > > > > > > > many cases where source tasks remain hanging around and > > > also > > > > that > > > > > > > we > > > > > > > > > > anyway > > > > > > > > > > > can't have similar data production guarantees for sink > > > > connectors > > > > > > > right > > > > > > > > > > > now. I agree that it might be better to go with ease of > > > > > > > implementation > > > > > > > > > > and > > > > > > > > > > > consistency for now. > > > > > > > > > > > > > > > > > > > > > > 6. Right, that does make sense but I still feel like > the > > > two > > > > > > states > > > > > > > > > will > > > > > > > > > > > end up being confusing to end users who might not be > able > > > to > > > > > > > discern > > > > > > > > > the > > > > > > > > > > > (fairly low-level) differences between them (also the > > > > nuances of > > > > > > > state > > > > > > > > > > > transitions like STOPPED -> PAUSED or PAUSED -> STOPPED > > > with > > > > the > > > > > > > > > > > rebalancing implications as well). We can probably > > revisit > > > > this > > > > > > > > > potential > > > > > > > > > > > deprecation in the future based on user feedback and > how > > > the > > > > > > > adoption > > > > > > > > > of > > > > > > > > > > > the new proposed stop endpoint looks like, what do you > > > think? > > > > > > > > > > > > > > > > > > > > > > 7. Aha, that is completely my bad, I missed that the > > v1/v2 > > > > state > > > > > > is > > > > > > > > > only > > > > > > > > > > > applicable to the connector's target state and that we > > > don't > > > > need > > > > > > > to > > > > > > > > > > worry > > > > > > > > > > > about the tasks since we will have an empty set of > > tasks. I > > > > > > think I > > > > > > > > > was a > > > > > > > > > > > little confused by "pause the parts of the connector > that > > > > they > > > > > > are > > > > > > > > > > > assigned" from the KIP. Thanks for clarifying that! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Some more thoughts and questions that I had - > > > > > > > > > > > > > > > > > > > > > > 8. Could you elaborate on what the implementation for > > > offset > > > > > > reset > > > > > > > for > > > > > > > > > > > source connectors would look like? Currently, it > doesn't > > > look > > > > > > like > > > > > > > we > > > > > > > > > > track > > > > > > > > > > > all the partitions for a source connector anywhere. > Will > > we > > > > need > > > > > > to > > > > > > > > > > > book-keep this somewhere in order to be able to emit a > > > > tombstone > > > > > > > record > > > > > > > > > > for > > > > > > > > > > > each source partition? > > > > > > > > > > > > > > > > > > > > > > 9. The KIP describes the offset reset endpoint as only > > > being > > > > > > > usable on > > > > > > > > > > > existing connectors that are in a `STOPPED` state. Why > > > > wouldn't > > > > > > we > > > > > > > want > > > > > > > > > > to > > > > > > > > > > > allow resetting offsets for a deleted connector which > > seems > > > > to > > > > > > be a > > > > > > > > > valid > > > > > > > > > > > use case? Or do we plan to handle this use case only > via > > > the > > > > item > > > > > > > > > > outlined > > > > > > > > > > > in the future work section - "Automatically delete > > offsets > > > > with > > > > > > > > > > > connectors"? > > > > > > > > > > > > > > > > > > > > > > 10. The KIP mentions that source offsets will be reset > > > > > > > transactionally > > > > > > > > > > for > > > > > > > > > > > each topic (worker global offset topic and connector > > > specific > > > > > > > offset > > > > > > > > > > topic > > > > > > > > > > > if it exists). While it obviously isn't possible to > > > > atomically do > > > > > > > the > > > > > > > > > > > writes to two topics which may be on different Kafka > > > > clusters, > > > > > > I'm > > > > > > > > > > > wondering about what would happen if the first > > transaction > > > > > > > succeeds but > > > > > > > > > > the > > > > > > > > > > > second one fails. I think the order of the two > > transactions > > > > > > matters > > > > > > > > > here > > > > > > > > > > - > > > > > > > > > > > if we successfully emit tombstones to the connector > > > specific > > > > > > offset > > > > > > > > > topic > > > > > > > > > > > and fail to do so for the worker global offset topic, > > we'll > > > > > > > presumably > > > > > > > > > > fail > > > > > > > > > > > the offset delete request because the KIP mentions that > > "A > > > > > > request > > > > > > > to > > > > > > > > > > reset > > > > > > > > > > > offsets for a source connector will only be considered > > > > successful > > > > > > > if > > > > > > > > > the > > > > > > > > > > > worker is able to delete all known offsets for that > > > > connector, on > > > > > > > both > > > > > > > > > > the > > > > > > > > > > > worker's global offsets topic and (if one is used) the > > > > > > connector's > > > > > > > > > > > dedicated offsets topic.". However, this will lead to > the > > > > > > connector > > > > > > > > > only > > > > > > > > > > > being able to read potentially older offsets from the > > > worker > > > > > > global > > > > > > > > > > offset > > > > > > > > > > > topic on resumption (based on the combined offset view > > > > presented > > > > > > as > > > > > > > > > > > described in KIP-618 [1]). So, I think we should make > > sure > > > > that > > > > > > the > > > > > > > > > > worker > > > > > > > > > > > global offset topic tombstoning is attempted first, > > right? > > > > Note > > > > > > > that in > > > > > > > > > > the > > > > > > > > > > > current implementation of > > > > `ConnectorOffsetBackingStore::set`, the > > > > > > > > > > primary / > > > > > > > > > > > connector specific offset store is written to first. > > > > > > > > > > > > > > > > > > > > > > 11. This probably isn't necessary to elaborate on in > the > > > KIP > > > > > > > itself, > > > > > > > > > but > > > > > > > > > > I > > > > > > > > > > > was wondering what the second offset test - "verify > that > > > that > > > > > > those > > > > > > > > > > offsets > > > > > > > > > > > reflect an expected level of progress for each > connector > > > > (i.e., > > > > > > > they > > > > > > > > > are > > > > > > > > > > > greater than or equal to a certain value depending on > how > > > the > > > > > > > > > connectors > > > > > > > > > > > are configured and how long they have been running)" - > > > would > > > > look > > > > > > > like? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > [1] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration > > > > > > > > > > > > > > > > > > > > > > On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton > > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your detailed thoughts. > > > > > > > > > > > > > > > > > > > > > > > > 1. In KAFKA-4107 [1], the primary request is exactly > > > what's > > > > > > > proposed > > > > > > > > > in > > > > > > > > > > > the > > > > > > > > > > > > KIP right now: a way to reset offsets for connectors. > > > Sure, > > > > > > > there's > > > > > > > > > an > > > > > > > > > > > > extra step of stopping the connector, but renaming a > > > > connector > > > > > > > isn't > > > > > > > > > as > > > > > > > > > > > > convenient of an alternative as it may seem since in > > many > > > > cases > > > > > > > you'd > > > > > > > > > > > also > > > > > > > > > > > > want to delete the older one, so the complete > sequence > > of > > > > steps > > > > > > > would > > > > > > > > > > be > > > > > > > > > > > > something like delete the old connector, rename it > > > > (possibly > > > > > > > > > requiring > > > > > > > > > > > > modifications to its config file, depending on which > > API > > > is > > > > > > > used), > > > > > > > > > then > > > > > > > > > > > > create the renamed variant. It's also just not a > great > > > user > > > > > > > > > > > > experience--even if the practical impacts are limited > > > > (which, > > > > > > > IMO, > > > > > > > > > they > > > > > > > > > > > are > > > > > > > > > > > > not), people have been asking for years about why > they > > > > have to > > > > > > > employ > > > > > > > > > > > this > > > > > > > > > > > > kind of a workaround for a fairly common use case, > and > > we > > > > don't > > > > > > > > > really > > > > > > > > > > > have > > > > > > > > > > > > a good answer beyond "we haven't implemented > something > > > > better > > > > > > > yet". > > > > > > > > > On > > > > > > > > > > > top > > > > > > > > > > > > of that, you may have external tooling that needs to > be > > > > tweaked > > > > > > > to > > > > > > > > > > > handle a > > > > > > > > > > > > new connector name, you may have strict authorization > > > > policies > > > > > > > around > > > > > > > > > > who > > > > > > > > > > > > can access what connectors, you may have other ACLs > > > > attached to > > > > > > > the > > > > > > > > > > name > > > > > > > > > > > of > > > > > > > > > > > > the connector (which can be especially common in the > > case > > > > of > > > > > > sink > > > > > > > > > > > > connectors, whose consumer group IDs are tied to > their > > > > names by > > > > > > > > > > default), > > > > > > > > > > > > and leaving around state in the offsets topic that > can > > > > never be > > > > > > > > > cleaned > > > > > > > > > > > up > > > > > > > > > > > > presents a bit of a footgun for users. It may not be > a > > > > silver > > > > > > > bullet, > > > > > > > > > > but > > > > > > > > > > > > providing some mechanism to reset that state is a > step > > in > > > > the > > > > > > > right > > > > > > > > > > > > direction and allows responsible users to more > > carefully > > > > > > > administer > > > > > > > > > > their > > > > > > > > > > > > cluster without resorting to non-public APIs. That > > said, > > > I > > > > do > > > > > > > agree > > > > > > > > > > that > > > > > > > > > > > a > > > > > > > > > > > > fine-grained reset/overwrite API would be useful, and > > I'd > > > > be > > > > > > > happy to > > > > > > > > > > > > review a KIP to add that feature if anyone wants to > > > tackle > > > > it! > > > > > > > > > > > > > > > > > > > > > > > > 2. Keeping the two formats symmetrical is motivated > > > mostly > > > > by > > > > > > > > > > aesthetics > > > > > > > > > > > > and quality-of-life for programmatic interaction with > > the > > > > API; > > > > > > > it's > > > > > > > > > not > > > > > > > > > > > > really a goal to hide the use of consumer groups from > > > > users. I > > > > > > do > > > > > > > > > agree > > > > > > > > > > > > that the format is a little strange-looking for sink > > > > > > connectors, > > > > > > > but > > > > > > > > > it > > > > > > > > > > > > seemed like it would be easier to work with for UIs, > > > > casual jq > > > > > > > > > queries, > > > > > > > > > > > and > > > > > > > > > > > > CLIs than a more Kafka-specific alternative such as > > > > > > > > > > > {"<topic>-<partition>": > > > > > > > > > > > > "<offset>"}, and although it is a little strange, I > > don't > > > > think > > > > > > > it's > > > > > > > > > > any > > > > > > > > > > > > less readable or intuitive. That said, I've made some > > > > tweaks to > > > > > > > the > > > > > > > > > > > format > > > > > > > > > > > > that should make programmatic access even easier; > > > > specifically, > > > > > > > I've > > > > > > > > > > > > removed the "source" and "sink" wrapper fields and > > > instead > > > > > > moved > > > > > > > them > > > > > > > > > > > into > > > > > > > > > > > > a top-level object with a "type" and "offsets" field, > > > just > > > > like > > > > > > > you > > > > > > > > > > > > suggested in point 3 (thanks!). We might also > consider > > > > changing > > > > > > > the > > > > > > > > > > field > > > > > > > > > > > > names for sink offsets from "topic", "partition", and > > > > "offset" > > > > > > to > > > > > > > > > > "Kafka > > > > > > > > > > > > topic", "Kafka partition", and "Kafka offset" > > > > respectively, to > > > > > > > reduce > > > > > > > > > > the > > > > > > > > > > > > stuttering effect of having a "partition" field > inside > > a > > > > > > > "partition" > > > > > > > > > > > field > > > > > > > > > > > > and the same with an "offset" field; thoughts? One > > final > > > > > > > point--by > > > > > > > > > > > equating > > > > > > > > > > > > source and sink offsets, we probably make it easier > for > > > > users > > > > > > to > > > > > > > > > > > understand > > > > > > > > > > > > exactly what a source offset is; anyone who's > familiar > > > with > > > > > > > consumer > > > > > > > > > > > > offsets can see from the response format that we > > > identify a > > > > > > > logical > > > > > > > > > > > > partition as a combination of two entities (a topic > > and a > > > > > > > partition > > > > > > > > > > > > number); it should make it easier to grok what a > source > > > > offset > > > > > > > is by > > > > > > > > > > > seeing > > > > > > > > > > > > what the two formats have in common. > > > > > > > > > > > > > > > > > > > > > > > > 3. Great idea! Done. > > > > > > > > > > > > > > > > > > > > > > > > 4. Yes, I'm thinking right now that a 409 will be the > > > > response > > > > > > > status > > > > > > > > > > if > > > > > > > > > > > a > > > > > > > > > > > > rebalance is pending. I'd rather not add this to the > > KIP > > > > as we > > > > > > > may > > > > > > > > > want > > > > > > > > > > > to > > > > > > > > > > > > change it at some point and it doesn't seem vital to > > > > establish > > > > > > > it as > > > > > > > > > > part > > > > > > > > > > > > of the public contract for the new endpoint right > now. > > > > Also, > > > > > > > small > > > > > > > > > > > > point--yes, a 409 is useful to avoid forwarding > > requests > > > > to an > > > > > > > > > > incorrect > > > > > > > > > > > > leader, but it's also useful to ensure that there > > aren't > > > > any > > > > > > > > > unresolved > > > > > > > > > > > > writes to the config topic that might cause issues > with > > > the > > > > > > > request > > > > > > > > > > (such > > > > > > > > > > > > as deleting the connector). > > > > > > > > > > > > > > > > > > > > > > > > 5. That's a good point--it may be misleading to call > a > > > > > > connector > > > > > > > > > > STOPPED > > > > > > > > > > > > when it has zombie tasks lying around on the > cluster. I > > > > don't > > > > > > > think > > > > > > > > > > it'd > > > > > > > > > > > be > > > > > > > > > > > > appropriate to do this synchronously while handling > > > > requests to > > > > > > > the > > > > > > > > > PUT > > > > > > > > > > > > /connectors/{connector}/stop since we'd want to give > > all > > > > > > > > > > > currently-running > > > > > > > > > > > > tasks a chance to gracefully shut down, though. I'm > > also > > > > not > > > > > > sure > > > > > > > > > that > > > > > > > > > > > this > > > > > > > > > > > > is a significant problem, either. If the connector is > > > > resumed, > > > > > > > then > > > > > > > > > all > > > > > > > > > > > > zombie tasks will be automatically fenced out by > their > > > > > > > successors on > > > > > > > > > > > > startup; if it's deleted, then we'll have wasted > effort > > > by > > > > > > > performing > > > > > > > > > > an > > > > > > > > > > > > unnecessary round of fencing. It may be nice to > > guarantee > > > > that > > > > > > > source > > > > > > > > > > > task > > > > > > > > > > > > resources will be deallocated after the connector > > > > transitions > > > > > > to > > > > > > > > > > STOPPED, > > > > > > > > > > > > but realistically, it doesn't do much to just fence > out > > > > their > > > > > > > > > > producers, > > > > > > > > > > > > since tasks can be blocked on a number of other > > > operations > > > > such > > > > > > > as > > > > > > > > > > > > key/value/header conversion, transformation, and task > > > > polling. > > > > > > > It may > > > > > > > > > > be > > > > > > > > > > > a > > > > > > > > > > > > little strange if data is produced to Kafka after the > > > > connector > > > > > > > has > > > > > > > > > > > > transitioned to STOPPED, but we can't provide the > same > > > > > > > guarantees for > > > > > > > > > > > sink > > > > > > > > > > > > connectors, since their tasks may be stuck on a > > > > long-running > > > > > > > > > > > SinkTask::put > > > > > > > > > > > > that emits data even after the Connect framework has > > > > abandoned > > > > > > > them > > > > > > > > > > after > > > > > > > > > > > > exhausting their graceful shutdown timeout. > Ultimately, > > > I'd > > > > > > > prefer to > > > > > > > > > > err > > > > > > > > > > > > on the side of consistency and ease of implementation > > for > > > > now, > > > > > > > but I > > > > > > > > > > may > > > > > > > > > > > be > > > > > > > > > > > > missing a case where a few extra records from a task > > > that's > > > > > > slow > > > > > > > to > > > > > > > > > > shut > > > > > > > > > > > > down may cause serious issues--let me know. > > > > > > > > > > > > > > > > > > > > > > > > 6. I'm hesitant to propose deprecation of the PAUSED > > > state > > > > > > right > > > > > > > now > > > > > > > > > as > > > > > > > > > > > it > > > > > > > > > > > > does serve a few purposes. Leaving tasks > > idling-but-ready > > > > makes > > > > > > > > > > resuming > > > > > > > > > > > > them less disruptive across the cluster, since a > > > rebalance > > > > > > isn't > > > > > > > > > > > necessary. > > > > > > > > > > > > It also reduces latency to resume the connector, > > > > especially for > > > > > > > ones > > > > > > > > > > that > > > > > > > > > > > > have to do a lot of state gathering on initialization > > to, > > > > e.g., > > > > > > > read > > > > > > > > > > > > offsets from an external system. > > > > > > > > > > > > > > > > > > > > > > > > 7. There should be no risk of mixed tasks after a > > > > downgrade, > > > > > > > thanks > > > > > > > > > to > > > > > > > > > > > the > > > > > > > > > > > > empty set of task configs that gets published to the > > > config > > > > > > > topic. > > > > > > > > > Both > > > > > > > > > > > > upgraded and downgraded workers will render an empty > > set > > > of > > > > > > > tasks for > > > > > > > > > > the > > > > > > > > > > > > connector, and keep that set of empty tasks until the > > > > connector > > > > > > > is > > > > > > > > > > > resumed. > > > > > > > > > > > > Does that address your concerns? > > > > > > > > > > > > > > > > > > > > > > > > You're also correct that the linked Jira ticket was > > > wrong; > > > > > > > thanks for > > > > > > > > > > > > pointing that out! Yes, KAFKA-4107 is the intended > > > ticket, > > > > and > > > > > > > I've > > > > > > > > > > > updated > > > > > > > > > > > > the link in the KIP accordingly. > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > [1] - > https://issues.apache.org/jira/browse/KAFKA-4107 > > > > > > > > > > > > > > > > > > > > > > > > On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya < > > > > > > > yash.ma...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks a lot for this KIP, I think something like > > this > > > > has > > > > > > been > > > > > > > > > long > > > > > > > > > > > > > overdue for Kafka Connect :) > > > > > > > > > > > > > > > > > > > > > > > > > > Some thoughts and questions that I had - > > > > > > > > > > > > > > > > > > > > > > > > > > 1. I'm wondering if you could elaborate a little > more > > > on > > > > the > > > > > > > use > > > > > > > > > case > > > > > > > > > > > for > > > > > > > > > > > > > the `DELETE /connectors/{connector}/offsets` API. I > > > > think we > > > > > > > can > > > > > > > > > all > > > > > > > > > > > > agree > > > > > > > > > > > > > that a fine grained reset API that allows setting > > > > arbitrary > > > > > > > offsets > > > > > > > > > > for > > > > > > > > > > > > > partitions would be quite useful (which you talk > > about > > > > in the > > > > > > > > > Future > > > > > > > > > > > work > > > > > > > > > > > > > section). But for the `DELETE > > > > > > /connectors/{connector}/offsets` > > > > > > > API > > > > > > > > > in > > > > > > > > > > > its > > > > > > > > > > > > > described form, it looks like it would only serve a > > > > seemingly > > > > > > > niche > > > > > > > > > > use > > > > > > > > > > > > > case where users want to avoid renaming connectors > - > > > > because > > > > > > > this > > > > > > > > > new > > > > > > > > > > > way > > > > > > > > > > > > > of resetting offsets actually has more steps (i.e. > > stop > > > > the > > > > > > > > > > connector, > > > > > > > > > > > > > reset offsets via the API, resume the connector) > than > > > > simply > > > > > > > > > deleting > > > > > > > > > > > and > > > > > > > > > > > > > re-creating the connector with a different name? > > > > > > > > > > > > > > > > > > > > > > > > > > 2. The KIP talks about taking care that the > response > > > > formats > > > > > > > > > > > (presumably > > > > > > > > > > > > > only talking about the new GET API here) are > > > symmetrical > > > > for > > > > > > > both > > > > > > > > > > > source > > > > > > > > > > > > > and sink connectors - is the end goal to have users > > of > > > > Kafka > > > > > > > > > Connect > > > > > > > > > > > not > > > > > > > > > > > > > even be aware that sink connectors use Kafka > > consumers > > > > under > > > > > > > the > > > > > > > > > hood > > > > > > > > > > > > (i.e. > > > > > > > > > > > > > have that as purely an implementation detail > > abstracted > > > > away > > > > > > > from > > > > > > > > > > > users)? > > > > > > > > > > > > > While I understand the value of uniformity here, > the > > > > response > > > > > > > > > format > > > > > > > > > > > for > > > > > > > > > > > > > sink connectors currently looks a little odd with > the > > > > > > > "partition" > > > > > > > > > > field > > > > > > > > > > > > > having "topic" and "partition" as sub-fields, > > > especially > > > > to > > > > > > > users > > > > > > > > > > > > familiar > > > > > > > > > > > > > with Kafka semantics. Thoughts? > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Another little nitpick on the response format - > > why > > > > do we > > > > > > > need > > > > > > > > > > > > "source" > > > > > > > > > > > > > / "sink" as a field under "offsets"? Users can > query > > > the > > > > > > > connector > > > > > > > > > > type > > > > > > > > > > > > via > > > > > > > > > > > > > the existing `GET /connectors` API. If it's deemed > > > > important > > > > > > > to let > > > > > > > > > > > users > > > > > > > > > > > > > know that the offsets they're seeing correspond to > a > > > > source / > > > > > > > sink > > > > > > > > > > > > > connector, maybe we could have a top level field > > "type" > > > > in > > > > > > the > > > > > > > > > > response > > > > > > > > > > > > for > > > > > > > > > > > > > the `GET /connectors/{connector}/offsets` API > similar > > > to > > > > the > > > > > > > `GET > > > > > > > > > > > > > /connectors` API? > > > > > > > > > > > > > > > > > > > > > > > > > > 4. For the `DELETE /connectors/{connector}/offsets` > > > API, > > > > the > > > > > > > KIP > > > > > > > > > > > mentions > > > > > > > > > > > > > that requests will be rejected if a rebalance is > > > pending > > > > - > > > > > > > > > presumably > > > > > > > > > > > > this > > > > > > > > > > > > > is to avoid forwarding requests to a leader which > may > > > no > > > > > > > longer be > > > > > > > > > > the > > > > > > > > > > > > > leader after the pending rebalance? In this case, > the > > > API > > > > > > will > > > > > > > > > > return a > > > > > > > > > > > > > `409 Conflict` response similar to some of the > > existing > > > > APIs, > > > > > > > > > right? > > > > > > > > > > > > > > > > > > > > > > > > > > 5. Regarding fencing out previously running tasks > > for a > > > > > > > connector, > > > > > > > > > do > > > > > > > > > > > you > > > > > > > > > > > > > think it would make more sense semantically to have > > > this > > > > > > > > > implemented > > > > > > > > > > in > > > > > > > > > > > > the > > > > > > > > > > > > > stop endpoint where an empty set of tasks is > > generated, > > > > > > rather > > > > > > > than > > > > > > > > > > the > > > > > > > > > > > > > delete offsets endpoint? This would also give the > new > > > > > > `STOPPED` > > > > > > > > > > state a > > > > > > > > > > > > > higher confidence of sorts, with any zombie tasks > > being > > > > > > fenced > > > > > > > off > > > > > > > > > > from > > > > > > > > > > > > > continuing to produce data. > > > > > > > > > > > > > > > > > > > > > > > > > > 6. Thanks for outlining the issues with the current > > > > state of > > > > > > > the > > > > > > > > > > > `PAUSED` > > > > > > > > > > > > > state - I think a lot of users expect it to behave > > like > > > > the > > > > > > > > > `STOPPED` > > > > > > > > > > > > state > > > > > > > > > > > > > you outline in the KIP and are (unpleasantly) > > surprised > > > > when > > > > > > it > > > > > > > > > > > doesn't. > > > > > > > > > > > > > However, this does beg the question of what the > > > > usefulness of > > > > > > > > > having > > > > > > > > > > > two > > > > > > > > > > > > > separate `PAUSED` and `STOPPED` states is? Do we > want > > > to > > > > > > > continue > > > > > > > > > > > > > supporting both these states in the future, or do > you > > > > see the > > > > > > > > > > `STOPPED` > > > > > > > > > > > > > state eventually causing the existing `PAUSED` > state > > to > > > > be > > > > > > > > > > deprecated? > > > > > > > > > > > > > > > > > > > > > > > > > > 7. I think the idea outlined in the KIP for > handling > > a > > > > new > > > > > > > state > > > > > > > > > > during > > > > > > > > > > > > > cluster downgrades / rolling upgrades is quite > > clever, > > > > but do > > > > > > > you > > > > > > > > > > think > > > > > > > > > > > > > there could be any issues with having a mix of > > "paused" > > > > and > > > > > > > > > "stopped" > > > > > > > > > > > > tasks > > > > > > > > > > > > > for the same connector across workers in a cluster? > > At > > > > the > > > > > > very > > > > > > > > > > least, > > > > > > > > > > > I > > > > > > > > > > > > > think it would be fairly confusing to most users. > I'm > > > > > > > wondering if > > > > > > > > > > this > > > > > > > > > > > > can > > > > > > > > > > > > > be avoided by stating clearly in the KIP that the > new > > > > `PUT > > > > > > > > > > > > > /connectors/{connector}/stop` > > > > > > > > > > > > > can only be used on a cluster that is fully > upgraded > > to > > > > an AK > > > > > > > > > version > > > > > > > > > > > > newer > > > > > > > > > > > > > than the one which ends up containing changes from > > this > > > > KIP > > > > > > and > > > > > > > > > that > > > > > > > > > > > if a > > > > > > > > > > > > > cluster needs to be downgraded to an older version, > > the > > > > user > > > > > > > should > > > > > > > > > > > > ensure > > > > > > > > > > > > > that none of the connectors on the cluster are in a > > > > stopped > > > > > > > state? > > > > > > > > > > With > > > > > > > > > > > > the > > > > > > > > > > > > > existing implementation, it looks like an > > > unknown/invalid > > > > > > > target > > > > > > > > > > state > > > > > > > > > > > > > record is basically just discarded (with an error > > > message > > > > > > > logged), > > > > > > > > > so > > > > > > > > > > > it > > > > > > > > > > > > > doesn't seem to be a disastrous failure scenario > that > > > can > > > > > > bring > > > > > > > > > down > > > > > > > > > > a > > > > > > > > > > > > > worker. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton > > > > > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Ashwin, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your thoughts. Regarding your > questions: > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. The response would show the offsets that are > > > > visible to > > > > > > > the > > > > > > > > > > source > > > > > > > > > > > > > > connector, so it would combine the contents of > the > > > two > > > > > > > topics, > > > > > > > > > > giving > > > > > > > > > > > > > > priority to offsets present in the > > connector-specific > > > > > > topic. > > > > > > > I'm > > > > > > > > > > > > > imagining > > > > > > > > > > > > > > a follow-up question that some people may have in > > > > response > > > > > > to > > > > > > > > > that > > > > > > > > > > is > > > > > > > > > > > > > > whether we'd want to provide insight into the > > > contents > > > > of a > > > > > > > > > single > > > > > > > > > > > > topic > > > > > > > > > > > > > at > > > > > > > > > > > > > > a time. It may be useful to be able to see this > > > > information > > > > > > > in > > > > > > > > > > order > > > > > > > > > > > to > > > > > > > > > > > > > > debug connector issues or verify that it's safe > to > > > stop > > > > > > > using a > > > > > > > > > > > > > > connector-specific offsets topic (either > > explicitly, > > > or > > > > > > > > > implicitly > > > > > > > > > > > via > > > > > > > > > > > > > > cluster downgrade). What do you think about > adding > > a > > > > URL > > > > > > > query > > > > > > > > > > > > parameter > > > > > > > > > > > > > > that allows users to dictate which view of the > > > > connector's > > > > > > > > > offsets > > > > > > > > > > > they > > > > > > > > > > > > > are > > > > > > > > > > > > > > given in the REST response, with options for the > > > > worker's > > > > > > > global > > > > > > > > > > > topic, > > > > > > > > > > > > > the > > > > > > > > > > > > > > connector-specific topic, and the combined view > of > > > them > > > > > > that > > > > > > > the > > > > > > > > > > > > > connector > > > > > > > > > > > > > > and its tasks see (which would be the default)? > > This > > > > may be > > > > > > > too > > > > > > > > > > much > > > > > > > > > > > > for > > > > > > > > > > > > > V1 > > > > > > > > > > > > > > but it feels like it's at least worth exploring a > > > bit. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. There is no option for this at the moment. > Reset > > > > > > > semantics are > > > > > > > > > > > > > extremely > > > > > > > > > > > > > > coarse-grained; for source connectors, we delete > > all > > > > source > > > > > > > > > > offsets, > > > > > > > > > > > > and > > > > > > > > > > > > > > for sink connectors, we delete the entire > consumer > > > > group. > > > > > > I'm > > > > > > > > > > hoping > > > > > > > > > > > > this > > > > > > > > > > > > > > will be enough for V1 and that, if there's > > sufficient > > > > > > demand > > > > > > > for > > > > > > > > > > it, > > > > > > > > > > > we > > > > > > > > > > > > > can > > > > > > > > > > > > > > introduce a richer API for resetting or even > > > modifying > > > > > > > connector > > > > > > > > > > > > offsets > > > > > > > > > > > > > in > > > > > > > > > > > > > > a follow-up KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Good eye :) I think it's fine to keep the > > existing > > > > > > > behavior > > > > > > > > > for > > > > > > > > > > > the > > > > > > > > > > > > > > PAUSED state with the Connector instance, since > the > > > > primary > > > > > > > > > purpose > > > > > > > > > > > of > > > > > > > > > > > > > the > > > > > > > > > > > > > > Connector is to generate task configs and monitor > > the > > > > > > > external > > > > > > > > > > system > > > > > > > > > > > > for > > > > > > > > > > > > > > changes. If there's no chance for tasks to be > > running > > > > > > > anyways, I > > > > > > > > > > > don't > > > > > > > > > > > > > see > > > > > > > > > > > > > > much value in allowing paused connectors to > > generate > > > > new > > > > > > task > > > > > > > > > > > configs, > > > > > > > > > > > > > > especially since each time that happens a > rebalance > > > is > > > > > > > triggered > > > > > > > > > > and > > > > > > > > > > > > > > there's a non-zero cost to that. What do you > think? > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin > > > > > > > > > > <apan...@confluent.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for KIP Chris - I think this is a useful > > > > feature. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you please elaborate on the following in > the > > > KIP > > > > - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. How would the response of GET > > > > > > > > > /connectors/{connector}/offsets > > > > > > > > > > > look > > > > > > > > > > > > > > like > > > > > > > > > > > > > > > if the worker has both global and connector > > > specific > > > > > > > offsets > > > > > > > > > > topic > > > > > > > > > > > ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. How can we pass the reset options like > > shift-by > > > , > > > > > > > > > to-date-time > > > > > > > > > > > > etc. > > > > > > > > > > > > > > > using a REST API like DELETE > > > > > > > /connectors/{connector}/offsets ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Today PAUSE operation on a connector invokes > > its > > > > stop > > > > > > > > > method - > > > > > > > > > > > > will > > > > > > > > > > > > > > > there be a change here to reduce confusion with > > the > > > > new > > > > > > > > > proposed > > > > > > > > > > > > > STOPPED > > > > > > > > > > > > > > > state ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Ashwin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton > > > > > > > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I noticed a fairly large gap in the first > > version > > > > of > > > > > > > this KIP > > > > > > > > > > > that > > > > > > > > > > > > I > > > > > > > > > > > > > > > > published last Friday, which has to do with > > > > > > accommodating > > > > > > > > > > > > connectors > > > > > > > > > > > > > > > > that target different Kafka clusters than the > > one > > > > that > > > > > > > the > > > > > > > > > > Kafka > > > > > > > > > > > > > > Connect > > > > > > > > > > > > > > > > cluster uses for its internal topics and > source > > > > > > > connectors > > > > > > > > > with > > > > > > > > > > > > > > dedicated > > > > > > > > > > > > > > > > offsets topics. I've since updated the KIP to > > > > address > > > > > > > this > > > > > > > > > gap, > > > > > > > > > > > > which > > > > > > > > > > > > > > has > > > > > > > > > > > > > > > > substantially altered the design. Wanted to > > give > > > a > > > > > > > heads-up > > > > > > > > > to > > > > > > > > > > > > anyone > > > > > > > > > > > > > > > > that's already started reviewing. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton > < > > > > > > > > > chr...@aiven.io> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to begin discussion on a KIP to > add > > > > offsets > > > > > > > > > support > > > > > > > > > > to > > > > > > > > > > > > the > > > > > > > > > > > > > > > Kafka > > > > > > > > > > > > > > > > > Connect REST API: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Nov 4, 2022 at 10:27 PM Chris Egerton > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > > > > > Thanks again for your thoughts! Responses to ongoing > > > > discussions > > > > > > > inline > > > > > > > > > > (easier to track context than referencing comment > numbers): > > > > > > > > > > > > > > > > > > > > > However, this then leads me to wonder if we can make > that > > > > > > explicit > > > > > > > by > > > > > > > > > > including "connect" or "connector" in the higher level > > field > > > > names? > > > > > > > Or do > > > > > > > > > > you think this isn't required given that we're talking > > about > > > a > > > > > > > Connect > > > > > > > > > > specific REST API in the first place? > > > > > > > > > > > > > > > > > > > > I think "partition" and "offset" are fine as field names > > but > > > > I'm > > > > > > not > > > > > > > > > hugely > > > > > > > > > > opposed to adding "connector " as a prefix to them; would > > be > > > > > > > interested > > > > > > > > > in > > > > > > > > > > others' thoughts. > > > > > > > > > > > > > > > > > > > > > I'm not sure I followed why the unresolved writes to > the > > > > config > > > > > > > topic > > > > > > > > > > would be an issue - wouldn't the delete offsets request > be > > > > added to > > > > > > > the > > > > > > > > > > herder's request queue and whenever it is processed, we'd > > > > anyway > > > > > > > need to > > > > > > > > > > check if all the prerequisites for the request are > > satisfied? > > > > > > > > > > > > > > > > > > > > Some requests are handled in multiple steps. For example, > > > > deleting > > > > > > a > > > > > > > > > > connector (1) adds a request to the herder queue to > write a > > > > > > > tombstone to > > > > > > > > > > the config topic (or, if the worker isn't the leader, > > forward > > > > the > > > > > > > request > > > > > > > > > > to the leader). (2) Once that tombstone is picked up, > (3) a > > > > > > rebalance > > > > > > > > > > ensues, and then after it's finally complete, (4) the > > > > connector and > > > > > > > its > > > > > > > > > > tasks are shut down. I probably could have used better > > > > terminology, > > > > > > > but > > > > > > > > > > what I meant by "unresolved writes to the config topic" > > was a > > > > case > > > > > > in > > > > > > > > > > between steps (2) and (3)--where the worker has already > > read > > > > that > > > > > > > > > tombstone > > > > > > > > > > from the config topic and knows that a rebalance is > > pending, > > > > but > > > > > > > hasn't > > > > > > > > > > begun participating in that rebalance yet. In the > > > > DistributedHerder > > > > > > > > > class, > > > > > > > > > > this is done via the `checkRebalanceNeeded` method. > > > > > > > > > > > > > > > > > > > > > We can probably revisit this potential deprecation [of > > the > > > > PAUSED > > > > > > > > > state] > > > > > > > > > > in the future based on user feedback and how the adoption > > of > > > > the > > > > > > new > > > > > > > > > > proposed stop endpoint looks like, what do you think? > > > > > > > > > > > > > > > > > > > > Yeah, revisiting in the future seems reasonable. 👍 > > > > > > > > > > > > > > > > > > > > And responses to new comments here: > > > > > > > > > > > > > > > > > > > > 8. Yep, we'll start tracking offsets by connector. I > don't > > > > believe > > > > > > > this > > > > > > > > > > should be too difficult, and suspect that the only reason > > we > > > > track > > > > > > > raw > > > > > > > > > byte > > > > > > > > > > arrays instead of pre-deserializing offset topic > > information > > > > into > > > > > > > > > something > > > > > > > > > > more useful is because Connect originally had pluggable > > > > internal > > > > > > > > > > converters. Now that we're hardcoded to use the JSON > > > converter > > > > it > > > > > > > should > > > > > > > > > be > > > > > > > > > > fine to track offsets on a per-connector basis as they're > > > read > > > > from > > > > > > > the > > > > > > > > > > offsets topic. > > > > > > > > > > > > > > > > > > > > 9. I'm hesitant to introduce this type of feature right > now > > > > because > > > > > > > of > > > > > > > > > all > > > > > > > > > > of the gotchas that would come with it. In > > security-conscious > > > > > > > > > environments, > > > > > > > > > > it's possible that a sink connector's principal may have > > > > access to > > > > > > > the > > > > > > > > > > consumer group used by the connector, but the worker's > > > > principal > > > > > > may > > > > > > > not. > > > > > > > > > > There's also the case where source connectors have > separate > > > > offsets > > > > > > > > > topics, > > > > > > > > > > or sink connectors have overridden consumer group IDs, or > > > sink > > > > or > > > > > > > source > > > > > > > > > > connectors work against a different Kafka cluster than > the > > > one > > > > that > > > > > > > their > > > > > > > > > > worker uses. Overall, I'd rather provide a single API > that > > > > works in > > > > > > > all > > > > > > > > > > cases rather than risk confusing and alienating users by > > > > trying to > > > > > > > make > > > > > > > > > > their lives easier in a subset of cases. > > > > > > > > > > > > > > > > > > > > 10. Hmm... I don't think the order of the writes matters > > too > > > > much > > > > > > > here, > > > > > > > > > but > > > > > > > > > > we probably could start by deleting from the global topic > > > > first, > > > > > > > that's > > > > > > > > > > true. The reason I'm not hugely concerned about this case > > is > > > > that > > > > > > if > > > > > > > > > > something goes wrong while resetting offsets, there's no > > > > immediate > > > > > > > > > > impact--the connector will still be in the STOPPED state. > > The > > > > REST > > > > > > > > > response > > > > > > > > > > for requests to reset the offsets will clearly call out > > that > > > > the > > > > > > > > > operation > > > > > > > > > > has failed, and if necessary, we can probably also add a > > > > > > > scary-looking > > > > > > > > > > warning message stating that we can't guarantee which > > offsets > > > > have > > > > > > > been > > > > > > > > > > successfully wiped and which haven't. Users can query the > > > exact > > > > > > > offsets > > > > > > > > > of > > > > > > > > > > the connector at this point to determine what will happen > > > > if/what > > > > > > > they > > > > > > > > > > resume it. And they can repeat attempts to reset the > > offsets > > > as > > > > > > many > > > > > > > > > times > > > > > > > > > > as they'd like until they get back a 2XX response, > > indicating > > > > that > > > > > > > it's > > > > > > > > > > finally safe to resume the connector. Thoughts? > > > > > > > > > > > > > > > > > > > > 11. I haven't thought too much about it. I think > something > > > > like the > > > > > > > > > > Monitorable* connectors would probably serve our needs > > here; > > > > we can > > > > > > > > > > instantiate them on a running Connect cluster and then > use > > > > various > > > > > > > > > handles > > > > > > > > > > to know how many times they've been polled, committed > > > records, > > > > etc. > > > > > > > If > > > > > > > > > > necessary we can tweak those classes or even write our > own. > > > But > > > > > > > anyways, > > > > > > > > > > once that's all done, the test will be something like > > > "create a > > > > > > > > > connector, > > > > > > > > > > wait for it to produce N records (each of which contains > > some > > > > kind > > > > > > of > > > > > > > > > > predictable offset), and ensure that the offsets for it > in > > > the > > > > REST > > > > > > > API > > > > > > > > > > match up with the ones we'd expect from N records". Does > > that > > > > > > answer > > > > > > > your > > > > > > > > > > question? > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > On Tue, Oct 18, 2022 at 3:28 AM Yash Mayya < > > > > yash.ma...@gmail.com> > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > > > > > 1. Thanks a lot for elaborating on this, I'm now > > convinced > > > > about > > > > > > > the > > > > > > > > > > > usefulness of the new offset reset endpoint. Regarding > > the > > > > > > > follow-up > > > > > > > > > KIP > > > > > > > > > > > for a fine-grained offset write API, I'd be happy to > take > > > > that on > > > > > > > once > > > > > > > > > > this > > > > > > > > > > > KIP is finalized and I will definitely look forward to > > your > > > > > > > feedback on > > > > > > > > > > > that one! > > > > > > > > > > > > > > > > > > > > > > 2. Gotcha, the motivation makes more sense to me now. > So > > > the > > > > > > higher > > > > > > > > > level > > > > > > > > > > > partition field represents a Connect specific "logical > > > > partition" > > > > > > > of > > > > > > > > > > sorts > > > > > > > > > > > - i.e. the source partition as defined by a connector > for > > > > source > > > > > > > > > > connectors > > > > > > > > > > > and a Kafka topic + partition for sink connectors. I > like > > > the > > > > > > idea > > > > > > > of > > > > > > > > > > > adding a Kafka prefix to the lower level > partition/offset > > > > (and > > > > > > > topic) > > > > > > > > > > > fields which basically makes it more clear (although > > > > implicitly) > > > > > > > that > > > > > > > > > the > > > > > > > > > > > higher level partition/offset field is Connect specific > > and > > > > not > > > > > > the > > > > > > > > > same > > > > > > > > > > as > > > > > > > > > > > what those terms represent in Kafka itself. However, > this > > > > then > > > > > > > leads me > > > > > > > > > > to > > > > > > > > > > > wonder if we can make that explicit by including > > "connect" > > > or > > > > > > > > > "connector" > > > > > > > > > > > in the higher level field names? Or do you think this > > isn't > > > > > > > required > > > > > > > > > > given > > > > > > > > > > > that we're talking about a Connect specific REST API in > > the > > > > first > > > > > > > > > place? > > > > > > > > > > > > > > > > > > > > > > 3. Thanks, I think the response structure definitely > > looks > > > > better > > > > > > > now! > > > > > > > > > > > > > > > > > > > > > > 4. Interesting, I'd be curious to learn why we might > want > > > to > > > > > > change > > > > > > > > > this > > > > > > > > > > in > > > > > > > > > > > the future but that's probably out of scope for this > > > > discussion. > > > > > > > I'm > > > > > > > > > not > > > > > > > > > > > sure I followed why the unresolved writes to the config > > > topic > > > > > > > would be > > > > > > > > > an > > > > > > > > > > > issue - wouldn't the delete offsets request be added to > > the > > > > > > > herder's > > > > > > > > > > > request queue and whenever it is processed, we'd anyway > > > need > > > > to > > > > > > > check > > > > > > > > > if > > > > > > > > > > > all the prerequisites for the request are satisfied? > > > > > > > > > > > > > > > > > > > > > > 5. Thanks for elaborating that just fencing out the > > > producer > > > > > > still > > > > > > > > > leaves > > > > > > > > > > > many cases where source tasks remain hanging around and > > > also > > > > that > > > > > > > we > > > > > > > > > > anyway > > > > > > > > > > > can't have similar data production guarantees for sink > > > > connectors > > > > > > > right > > > > > > > > > > > now. I agree that it might be better to go with ease of > > > > > > > implementation > > > > > > > > > > and > > > > > > > > > > > consistency for now. > > > > > > > > > > > > > > > > > > > > > > 6. Right, that does make sense but I still feel like > the > > > two > > > > > > states > > > > > > > > > will > > > > > > > > > > > end up being confusing to end users who might not be > able > > > to > > > > > > > discern > > > > > > > > > the > > > > > > > > > > > (fairly low-level) differences between them (also the > > > > nuances of > > > > > > > state > > > > > > > > > > > transitions like STOPPED -> PAUSED or PAUSED -> STOPPED > > > with > > > > the > > > > > > > > > > > rebalancing implications as well). We can probably > > revisit > > > > this > > > > > > > > > potential > > > > > > > > > > > deprecation in the future based on user feedback and > how > > > the > > > > > > > adoption > > > > > > > > > of > > > > > > > > > > > the new proposed stop endpoint looks like, what do you > > > think? > > > > > > > > > > > > > > > > > > > > > > 7. Aha, that is completely my bad, I missed that the > > v1/v2 > > > > state > > > > > > is > > > > > > > > > only > > > > > > > > > > > applicable to the connector's target state and that we > > > don't > > > > need > > > > > > > to > > > > > > > > > > worry > > > > > > > > > > > about the tasks since we will have an empty set of > > tasks. I > > > > > > think I > > > > > > > > > was a > > > > > > > > > > > little confused by "pause the parts of the connector > that > > > > they > > > > > > are > > > > > > > > > > > assigned" from the KIP. Thanks for clarifying that! > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Some more thoughts and questions that I had - > > > > > > > > > > > > > > > > > > > > > > 8. Could you elaborate on what the implementation for > > > offset > > > > > > reset > > > > > > > for > > > > > > > > > > > source connectors would look like? Currently, it > doesn't > > > look > > > > > > like > > > > > > > we > > > > > > > > > > track > > > > > > > > > > > all the partitions for a source connector anywhere. > Will > > we > > > > need > > > > > > to > > > > > > > > > > > book-keep this somewhere in order to be able to emit a > > > > tombstone > > > > > > > record > > > > > > > > > > for > > > > > > > > > > > each source partition? > > > > > > > > > > > > > > > > > > > > > > 9. The KIP describes the offset reset endpoint as only > > > being > > > > > > > usable on > > > > > > > > > > > existing connectors that are in a `STOPPED` state. Why > > > > wouldn't > > > > > > we > > > > > > > want > > > > > > > > > > to > > > > > > > > > > > allow resetting offsets for a deleted connector which > > seems > > > > to > > > > > > be a > > > > > > > > > valid > > > > > > > > > > > use case? Or do we plan to handle this use case only > via > > > the > > > > item > > > > > > > > > > outlined > > > > > > > > > > > in the future work section - "Automatically delete > > offsets > > > > with > > > > > > > > > > > connectors"? > > > > > > > > > > > > > > > > > > > > > > 10. The KIP mentions that source offsets will be reset > > > > > > > transactionally > > > > > > > > > > for > > > > > > > > > > > each topic (worker global offset topic and connector > > > specific > > > > > > > offset > > > > > > > > > > topic > > > > > > > > > > > if it exists). While it obviously isn't possible to > > > > atomically do > > > > > > > the > > > > > > > > > > > writes to two topics which may be on different Kafka > > > > clusters, > > > > > > I'm > > > > > > > > > > > wondering about what would happen if the first > > transaction > > > > > > > succeeds but > > > > > > > > > > the > > > > > > > > > > > second one fails. I think the order of the two > > transactions > > > > > > matters > > > > > > > > > here > > > > > > > > > > - > > > > > > > > > > > if we successfully emit tombstones to the connector > > > specific > > > > > > offset > > > > > > > > > topic > > > > > > > > > > > and fail to do so for the worker global offset topic, > > we'll > > > > > > > presumably > > > > > > > > > > fail > > > > > > > > > > > the offset delete request because the KIP mentions that > > "A > > > > > > request > > > > > > > to > > > > > > > > > > reset > > > > > > > > > > > offsets for a source connector will only be considered > > > > successful > > > > > > > if > > > > > > > > > the > > > > > > > > > > > worker is able to delete all known offsets for that > > > > connector, on > > > > > > > both > > > > > > > > > > the > > > > > > > > > > > worker's global offsets topic and (if one is used) the > > > > > > connector's > > > > > > > > > > > dedicated offsets topic.". However, this will lead to > the > > > > > > connector > > > > > > > > > only > > > > > > > > > > > being able to read potentially older offsets from the > > > worker > > > > > > global > > > > > > > > > > offset > > > > > > > > > > > topic on resumption (based on the combined offset view > > > > presented > > > > > > as > > > > > > > > > > > described in KIP-618 [1]). So, I think we should make > > sure > > > > that > > > > > > the > > > > > > > > > > worker > > > > > > > > > > > global offset topic tombstoning is attempted first, > > right? > > > > Note > > > > > > > that in > > > > > > > > > > the > > > > > > > > > > > current implementation of > > > > `ConnectorOffsetBackingStore::set`, the > > > > > > > > > > primary / > > > > > > > > > > > connector specific offset store is written to first. > > > > > > > > > > > > > > > > > > > > > > 11. This probably isn't necessary to elaborate on in > the > > > KIP > > > > > > > itself, > > > > > > > > > but > > > > > > > > > > I > > > > > > > > > > > was wondering what the second offset test - "verify > that > > > that > > > > > > those > > > > > > > > > > offsets > > > > > > > > > > > reflect an expected level of progress for each > connector > > > > (i.e., > > > > > > > they > > > > > > > > > are > > > > > > > > > > > greater than or equal to a certain value depending on > how > > > the > > > > > > > > > connectors > > > > > > > > > > > are configured and how long they have been running)" - > > > would > > > > look > > > > > > > like? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > [1] - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration > > > > > > > > > > > > > > > > > > > > > > On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton > > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > Hi Yash, > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your detailed thoughts. > > > > > > > > > > > > > > > > > > > > > > > > 1. In KAFKA-4107 [1], the primary request is exactly > > > what's > > > > > > > proposed > > > > > > > > > in > > > > > > > > > > > the > > > > > > > > > > > > KIP right now: a way to reset offsets for connectors. > > > Sure, > > > > > > > there's > > > > > > > > > an > > > > > > > > > > > > extra step of stopping the connector, but renaming a > > > > connector > > > > > > > isn't > > > > > > > > > as > > > > > > > > > > > > convenient of an alternative as it may seem since in > > many > > > > cases > > > > > > > you'd > > > > > > > > > > > also > > > > > > > > > > > > want to delete the older one, so the complete > sequence > > of > > > > steps > > > > > > > would > > > > > > > > > > be > > > > > > > > > > > > something like delete the old connector, rename it > > > > (possibly > > > > > > > > > requiring > > > > > > > > > > > > modifications to its config file, depending on which > > API > > > is > > > > > > > used), > > > > > > > > > then > > > > > > > > > > > > create the renamed variant. It's also just not a > great > > > user > > > > > > > > > > > > experience--even if the practical impacts are limited > > > > (which, > > > > > > > IMO, > > > > > > > > > they > > > > > > > > > > > are > > > > > > > > > > > > not), people have been asking for years about why > they > > > > have to > > > > > > > employ > > > > > > > > > > > this > > > > > > > > > > > > kind of a workaround for a fairly common use case, > and > > we > > > > don't > > > > > > > > > really > > > > > > > > > > > have > > > > > > > > > > > > a good answer beyond "we haven't implemented > something > > > > better > > > > > > > yet". > > > > > > > > > On > > > > > > > > > > > top > > > > > > > > > > > > of that, you may have external tooling that needs to > be > > > > tweaked > > > > > > > to > > > > > > > > > > > handle a > > > > > > > > > > > > new connector name, you may have strict authorization > > > > policies > > > > > > > around > > > > > > > > > > who > > > > > > > > > > > > can access what connectors, you may have other ACLs > > > > attached to > > > > > > > the > > > > > > > > > > name > > > > > > > > > > > of > > > > > > > > > > > > the connector (which can be especially common in the > > case > > > > of > > > > > > sink > > > > > > > > > > > > connectors, whose consumer group IDs are tied to > their > > > > names by > > > > > > > > > > default), > > > > > > > > > > > > and leaving around state in the offsets topic that > can > > > > never be > > > > > > > > > cleaned > > > > > > > > > > > up > > > > > > > > > > > > presents a bit of a footgun for users. It may not be > a > > > > silver > > > > > > > bullet, > > > > > > > > > > but > > > > > > > > > > > > providing some mechanism to reset that state is a > step > > in > > > > the > > > > > > > right > > > > > > > > > > > > direction and allows responsible users to more > > carefully > > > > > > > administer > > > > > > > > > > their > > > > > > > > > > > > cluster without resorting to non-public APIs. That > > said, > > > I > > > > do > > > > > > > agree > > > > > > > > > > that > > > > > > > > > > > a > > > > > > > > > > > > fine-grained reset/overwrite API would be useful, and > > I'd > > > > be > > > > > > > happy to > > > > > > > > > > > > review a KIP to add that feature if anyone wants to > > > tackle > > > > it! > > > > > > > > > > > > > > > > > > > > > > > > 2. Keeping the two formats symmetrical is motivated > > > mostly > > > > by > > > > > > > > > > aesthetics > > > > > > > > > > > > and quality-of-life for programmatic interaction with > > the > > > > API; > > > > > > > it's > > > > > > > > > not > > > > > > > > > > > > really a goal to hide the use of consumer groups from > > > > users. I > > > > > > do > > > > > > > > > agree > > > > > > > > > > > > that the format is a little strange-looking for sink > > > > > > connectors, > > > > > > > but > > > > > > > > > it > > > > > > > > > > > > seemed like it would be easier to work with for UIs, > > > > casual jq > > > > > > > > > queries, > > > > > > > > > > > and > > > > > > > > > > > > CLIs than a more Kafka-specific alternative such as > > > > > > > > > > > {"<topic>-<partition>": > > > > > > > > > > > > "<offset>"}, and although it is a little strange, I > > don't > > > > think > > > > > > > it's > > > > > > > > > > any > > > > > > > > > > > > less readable or intuitive. That said, I've made some > > > > tweaks to > > > > > > > the > > > > > > > > > > > format > > > > > > > > > > > > that should make programmatic access even easier; > > > > specifically, > > > > > > > I've > > > > > > > > > > > > removed the "source" and "sink" wrapper fields and > > > instead > > > > > > moved > > > > > > > them > > > > > > > > > > > into > > > > > > > > > > > > a top-level object with a "type" and "offsets" field, > > > just > > > > like > > > > > > > you > > > > > > > > > > > > suggested in point 3 (thanks!). We might also > consider > > > > changing > > > > > > > the > > > > > > > > > > field > > > > > > > > > > > > names for sink offsets from "topic", "partition", and > > > > "offset" > > > > > > to > > > > > > > > > > "Kafka > > > > > > > > > > > > topic", "Kafka partition", and "Kafka offset" > > > > respectively, to > > > > > > > reduce > > > > > > > > > > the > > > > > > > > > > > > stuttering effect of having a "partition" field > inside > > a > > > > > > > "partition" > > > > > > > > > > > field > > > > > > > > > > > > and the same with an "offset" field; thoughts? One > > final > > > > > > > point--by > > > > > > > > > > > equating > > > > > > > > > > > > source and sink offsets, we probably make it easier > for > > > > users > > > > > > to > > > > > > > > > > > understand > > > > > > > > > > > > exactly what a source offset is; anyone who's > familiar > > > with > > > > > > > consumer > > > > > > > > > > > > offsets can see from the response format that we > > > identify a > > > > > > > logical > > > > > > > > > > > > partition as a combination of two entities (a topic > > and a > > > > > > > partition > > > > > > > > > > > > number); it should make it easier to grok what a > source > > > > offset > > > > > > > is by > > > > > > > > > > > seeing > > > > > > > > > > > > what the two formats have in common. > > > > > > > > > > > > > > > > > > > > > > > > 3. Great idea! Done. > > > > > > > > > > > > > > > > > > > > > > > > 4. Yes, I'm thinking right now that a 409 will be the > > > > response > > > > > > > status > > > > > > > > > > if > > > > > > > > > > > a > > > > > > > > > > > > rebalance is pending. I'd rather not add this to the > > KIP > > > > as we > > > > > > > may > > > > > > > > > want > > > > > > > > > > > to > > > > > > > > > > > > change it at some point and it doesn't seem vital to > > > > establish > > > > > > > it as > > > > > > > > > > part > > > > > > > > > > > > of the public contract for the new endpoint right > now. > > > > Also, > > > > > > > small > > > > > > > > > > > > point--yes, a 409 is useful to avoid forwarding > > requests > > > > to an > > > > > > > > > > incorrect > > > > > > > > > > > > leader, but it's also useful to ensure that there > > aren't > > > > any > > > > > > > > > unresolved > > > > > > > > > > > > writes to the config topic that might cause issues > with > > > the > > > > > > > request > > > > > > > > > > (such > > > > > > > > > > > > as deleting the connector). > > > > > > > > > > > > > > > > > > > > > > > > 5. That's a good point--it may be misleading to call > a > > > > > > connector > > > > > > > > > > STOPPED > > > > > > > > > > > > when it has zombie tasks lying around on the > cluster. I > > > > don't > > > > > > > think > > > > > > > > > > it'd > > > > > > > > > > > be > > > > > > > > > > > > appropriate to do this synchronously while handling > > > > requests to > > > > > > > the > > > > > > > > > PUT > > > > > > > > > > > > /connectors/{connector}/stop since we'd want to give > > all > > > > > > > > > > > currently-running > > > > > > > > > > > > tasks a chance to gracefully shut down, though. I'm > > also > > > > not > > > > > > sure > > > > > > > > > that > > > > > > > > > > > this > > > > > > > > > > > > is a significant problem, either. If the connector is > > > > resumed, > > > > > > > then > > > > > > > > > all > > > > > > > > > > > > zombie tasks will be automatically fenced out by > their > > > > > > > successors on > > > > > > > > > > > > startup; if it's deleted, then we'll have wasted > effort > > > by > > > > > > > performing > > > > > > > > > > an > > > > > > > > > > > > unnecessary round of fencing. It may be nice to > > guarantee > > > > that > > > > > > > source > > > > > > > > > > > task > > > > > > > > > > > > resources will be deallocated after the connector > > > > transitions > > > > > > to > > > > > > > > > > STOPPED, > > > > > > > > > > > > but realistically, it doesn't do much to just fence > out > > > > their > > > > > > > > > > producers, > > > > > > > > > > > > since tasks can be blocked on a number of other > > > operations > > > > such > > > > > > > as > > > > > > > > > > > > key/value/header conversion, transformation, and task > > > > polling. > > > > > > > It may > > > > > > > > > > be > > > > > > > > > > > a > > > > > > > > > > > > little strange if data is produced to Kafka after the > > > > connector > > > > > > > has > > > > > > > > > > > > transitioned to STOPPED, but we can't provide the > same > > > > > > > guarantees for > > > > > > > > > > > sink > > > > > > > > > > > > connectors, since their tasks may be stuck on a > > > > long-running > > > > > > > > > > > SinkTask::put > > > > > > > > > > > > that emits data even after the Connect framework has > > > > abandoned > > > > > > > them > > > > > > > > > > after > > > > > > > > > > > > exhausting their graceful shutdown timeout. > Ultimately, > > > I'd > > > > > > > prefer to > > > > > > > > > > err > > > > > > > > > > > > on the side of consistency and ease of implementation > > for > > > > now, > > > > > > > but I > > > > > > > > > > may > > > > > > > > > > > be > > > > > > > > > > > > missing a case where a few extra records from a task > > > that's > > > > > > slow > > > > > > > to > > > > > > > > > > shut > > > > > > > > > > > > down may cause serious issues--let me know. > > > > > > > > > > > > > > > > > > > > > > > > 6. I'm hesitant to propose deprecation of the PAUSED > > > state > > > > > > right > > > > > > > now > > > > > > > > > as > > > > > > > > > > > it > > > > > > > > > > > > does serve a few purposes. Leaving tasks > > idling-but-ready > > > > makes > > > > > > > > > > resuming > > > > > > > > > > > > them less disruptive across the cluster, since a > > > rebalance > > > > > > isn't > > > > > > > > > > > necessary. > > > > > > > > > > > > It also reduces latency to resume the connector, > > > > especially for > > > > > > > ones > > > > > > > > > > that > > > > > > > > > > > > have to do a lot of state gathering on initialization > > to, > > > > e.g., > > > > > > > read > > > > > > > > > > > > offsets from an external system. > > > > > > > > > > > > > > > > > > > > > > > > 7. There should be no risk of mixed tasks after a > > > > downgrade, > > > > > > > thanks > > > > > > > > > to > > > > > > > > > > > the > > > > > > > > > > > > empty set of task configs that gets published to the > > > config > > > > > > > topic. > > > > > > > > > Both > > > > > > > > > > > > upgraded and downgraded workers will render an empty > > set > > > of > > > > > > > tasks for > > > > > > > > > > the > > > > > > > > > > > > connector, and keep that set of empty tasks until the > > > > connector > > > > > > > is > > > > > > > > > > > resumed. > > > > > > > > > > > > Does that address your concerns? > > > > > > > > > > > > > > > > > > > > > > > > You're also correct that the linked Jira ticket was > > > wrong; > > > > > > > thanks for > > > > > > > > > > > > pointing that out! Yes, KAFKA-4107 is the intended > > > ticket, > > > > and > > > > > > > I've > > > > > > > > > > > updated > > > > > > > > > > > > the link in the KIP accordingly. > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > [1] - > https://issues.apache.org/jira/browse/KAFKA-4107 > > > > > > > > > > > > > > > > > > > > > > > > On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya < > > > > > > > yash.ma...@gmail.com> > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > Hi Chris, > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks a lot for this KIP, I think something like > > this > > > > has > > > > > > been > > > > > > > > > long > > > > > > > > > > > > > overdue for Kafka Connect :) > > > > > > > > > > > > > > > > > > > > > > > > > > Some thoughts and questions that I had - > > > > > > > > > > > > > > > > > > > > > > > > > > 1. I'm wondering if you could elaborate a little > more > > > on > > > > the > > > > > > > use > > > > > > > > > case > > > > > > > > > > > for > > > > > > > > > > > > > the `DELETE /connectors/{connector}/offsets` API. I > > > > think we > > > > > > > can > > > > > > > > > all > > > > > > > > > > > > agree > > > > > > > > > > > > > that a fine grained reset API that allows setting > > > > arbitrary > > > > > > > offsets > > > > > > > > > > for > > > > > > > > > > > > > partitions would be quite useful (which you talk > > about > > > > in the > > > > > > > > > Future > > > > > > > > > > > work > > > > > > > > > > > > > section). But for the `DELETE > > > > > > /connectors/{connector}/offsets` > > > > > > > API > > > > > > > > > in > > > > > > > > > > > its > > > > > > > > > > > > > described form, it looks like it would only serve a > > > > seemingly > > > > > > > niche > > > > > > > > > > use > > > > > > > > > > > > > case where users want to avoid renaming connectors > - > > > > because > > > > > > > this > > > > > > > > > new > > > > > > > > > > > way > > > > > > > > > > > > > of resetting offsets actually has more steps (i.e. > > stop > > > > the > > > > > > > > > > connector, > > > > > > > > > > > > > reset offsets via the API, resume the connector) > than > > > > simply > > > > > > > > > deleting > > > > > > > > > > > and > > > > > > > > > > > > > re-creating the connector with a different name? > > > > > > > > > > > > > > > > > > > > > > > > > > 2. The KIP talks about taking care that the > response > > > > formats > > > > > > > > > > > (presumably > > > > > > > > > > > > > only talking about the new GET API here) are > > > symmetrical > > > > for > > > > > > > both > > > > > > > > > > > source > > > > > > > > > > > > > and sink connectors - is the end goal to have users > > of > > > > Kafka > > > > > > > > > Connect > > > > > > > > > > > not > > > > > > > > > > > > > even be aware that sink connectors use Kafka > > consumers > > > > under > > > > > > > the > > > > > > > > > hood > > > > > > > > > > > > (i.e. > > > > > > > > > > > > > have that as purely an implementation detail > > abstracted > > > > away > > > > > > > from > > > > > > > > > > > users)? > > > > > > > > > > > > > While I understand the value of uniformity here, > the > > > > response > > > > > > > > > format > > > > > > > > > > > for > > > > > > > > > > > > > sink connectors currently looks a little odd with > the > > > > > > > "partition" > > > > > > > > > > field > > > > > > > > > > > > > having "topic" and "partition" as sub-fields, > > > especially > > > > to > > > > > > > users > > > > > > > > > > > > familiar > > > > > > > > > > > > > with Kafka semantics. Thoughts? > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Another little nitpick on the response format - > > why > > > > do we > > > > > > > need > > > > > > > > > > > > "source" > > > > > > > > > > > > > / "sink" as a field under "offsets"? Users can > query > > > the > > > > > > > connector > > > > > > > > > > type > > > > > > > > > > > > via > > > > > > > > > > > > > the existing `GET /connectors` API. If it's deemed > > > > important > > > > > > > to let > > > > > > > > > > > users > > > > > > > > > > > > > know that the offsets they're seeing correspond to > a > > > > source / > > > > > > > sink > > > > > > > > > > > > > connector, maybe we could have a top level field > > "type" > > > > in > > > > > > the > > > > > > > > > > response > > > > > > > > > > > > for > > > > > > > > > > > > > the `GET /connectors/{connector}/offsets` API > similar > > > to > > > > the > > > > > > > `GET > > > > > > > > > > > > > /connectors` API? > > > > > > > > > > > > > > > > > > > > > > > > > > 4. For the `DELETE /connectors/{connector}/offsets` > > > API, > > > > the > > > > > > > KIP > > > > > > > > > > > mentions > > > > > > > > > > > > > that requests will be rejected if a rebalance is > > > pending > > > > - > > > > > > > > > presumably > > > > > > > > > > > > this > > > > > > > > > > > > > is to avoid forwarding requests to a leader which > may > > > no > > > > > > > longer be > > > > > > > > > > the > > > > > > > > > > > > > leader after the pending rebalance? In this case, > the > > > API > > > > > > will > > > > > > > > > > return a > > > > > > > > > > > > > `409 Conflict` response similar to some of the > > existing > > > > APIs, > > > > > > > > > right? > > > > > > > > > > > > > > > > > > > > > > > > > > 5. Regarding fencing out previously running tasks > > for a > > > > > > > connector, > > > > > > > > > do > > > > > > > > > > > you > > > > > > > > > > > > > think it would make more sense semantically to have > > > this > > > > > > > > > implemented > > > > > > > > > > in > > > > > > > > > > > > the > > > > > > > > > > > > > stop endpoint where an empty set of tasks is > > generated, > > > > > > rather > > > > > > > than > > > > > > > > > > the > > > > > > > > > > > > > delete offsets endpoint? This would also give the > new > > > > > > `STOPPED` > > > > > > > > > > state a > > > > > > > > > > > > > higher confidence of sorts, with any zombie tasks > > being > > > > > > fenced > > > > > > > off > > > > > > > > > > from > > > > > > > > > > > > > continuing to produce data. > > > > > > > > > > > > > > > > > > > > > > > > > > 6. Thanks for outlining the issues with the current > > > > state of > > > > > > > the > > > > > > > > > > > `PAUSED` > > > > > > > > > > > > > state - I think a lot of users expect it to behave > > like > > > > the > > > > > > > > > `STOPPED` > > > > > > > > > > > > state > > > > > > > > > > > > > you outline in the KIP and are (unpleasantly) > > surprised > > > > when > > > > > > it > > > > > > > > > > > doesn't. > > > > > > > > > > > > > However, this does beg the question of what the > > > > usefulness of > > > > > > > > > having > > > > > > > > > > > two > > > > > > > > > > > > > separate `PAUSED` and `STOPPED` states is? Do we > want > > > to > > > > > > > continue > > > > > > > > > > > > > supporting both these states in the future, or do > you > > > > see the > > > > > > > > > > `STOPPED` > > > > > > > > > > > > > state eventually causing the existing `PAUSED` > state > > to > > > > be > > > > > > > > > > deprecated? > > > > > > > > > > > > > > > > > > > > > > > > > > 7. I think the idea outlined in the KIP for > handling > > a > > > > new > > > > > > > state > > > > > > > > > > during > > > > > > > > > > > > > cluster downgrades / rolling upgrades is quite > > clever, > > > > but do > > > > > > > you > > > > > > > > > > think > > > > > > > > > > > > > there could be any issues with having a mix of > > "paused" > > > > and > > > > > > > > > "stopped" > > > > > > > > > > > > tasks > > > > > > > > > > > > > for the same connector across workers in a cluster? > > At > > > > the > > > > > > very > > > > > > > > > > least, > > > > > > > > > > > I > > > > > > > > > > > > > think it would be fairly confusing to most users. > I'm > > > > > > > wondering if > > > > > > > > > > this > > > > > > > > > > > > can > > > > > > > > > > > > > be avoided by stating clearly in the KIP that the > new > > > > `PUT > > > > > > > > > > > > > /connectors/{connector}/stop` > > > > > > > > > > > > > can only be used on a cluster that is fully > upgraded > > to > > > > an AK > > > > > > > > > version > > > > > > > > > > > > newer > > > > > > > > > > > > > than the one which ends up containing changes from > > this > > > > KIP > > > > > > and > > > > > > > > > that > > > > > > > > > > > if a > > > > > > > > > > > > > cluster needs to be downgraded to an older version, > > the > > > > user > > > > > > > should > > > > > > > > > > > > ensure > > > > > > > > > > > > > that none of the connectors on the cluster are in a > > > > stopped > > > > > > > state? > > > > > > > > > > With > > > > > > > > > > > > the > > > > > > > > > > > > > existing implementation, it looks like an > > > unknown/invalid > > > > > > > target > > > > > > > > > > state > > > > > > > > > > > > > record is basically just discarded (with an error > > > message > > > > > > > logged), > > > > > > > > > so > > > > > > > > > > > it > > > > > > > > > > > > > doesn't seem to be a disastrous failure scenario > that > > > can > > > > > > bring > > > > > > > > > down > > > > > > > > > > a > > > > > > > > > > > > > worker. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > Yash > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton > > > > > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi Ashwin, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for your thoughts. Regarding your > questions: > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. The response would show the offsets that are > > > > visible to > > > > > > > the > > > > > > > > > > source > > > > > > > > > > > > > > connector, so it would combine the contents of > the > > > two > > > > > > > topics, > > > > > > > > > > giving > > > > > > > > > > > > > > priority to offsets present in the > > connector-specific > > > > > > topic. > > > > > > > I'm > > > > > > > > > > > > > imagining > > > > > > > > > > > > > > a follow-up question that some people may have in > > > > response > > > > > > to > > > > > > > > > that > > > > > > > > > > is > > > > > > > > > > > > > > whether we'd want to provide insight into the > > > contents > > > > of a > > > > > > > > > single > > > > > > > > > > > > topic > > > > > > > > > > > > > at > > > > > > > > > > > > > > a time. It may be useful to be able to see this > > > > information > > > > > > > in > > > > > > > > > > order > > > > > > > > > > > to > > > > > > > > > > > > > > debug connector issues or verify that it's safe > to > > > stop > > > > > > > using a > > > > > > > > > > > > > > connector-specific offsets topic (either > > explicitly, > > > or > > > > > > > > > implicitly > > > > > > > > > > > via > > > > > > > > > > > > > > cluster downgrade). What do you think about > adding > > a > > > > URL > > > > > > > query > > > > > > > > > > > > parameter > > > > > > > > > > > > > > that allows users to dictate which view of the > > > > connector's > > > > > > > > > offsets > > > > > > > > > > > they > > > > > > > > > > > > > are > > > > > > > > > > > > > > given in the REST response, with options for the > > > > worker's > > > > > > > global > > > > > > > > > > > topic, > > > > > > > > > > > > > the > > > > > > > > > > > > > > connector-specific topic, and the combined view > of > > > them > > > > > > that > > > > > > > the > > > > > > > > > > > > > connector > > > > > > > > > > > > > > and its tasks see (which would be the default)? > > This > > > > may be > > > > > > > too > > > > > > > > > > much > > > > > > > > > > > > for > > > > > > > > > > > > > V1 > > > > > > > > > > > > > > but it feels like it's at least worth exploring a > > > bit. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. There is no option for this at the moment. > Reset > > > > > > > semantics are > > > > > > > > > > > > > extremely > > > > > > > > > > > > > > coarse-grained; for source connectors, we delete > > all > > > > source > > > > > > > > > > offsets, > > > > > > > > > > > > and > > > > > > > > > > > > > > for sink connectors, we delete the entire > consumer > > > > group. > > > > > > I'm > > > > > > > > > > hoping > > > > > > > > > > > > this > > > > > > > > > > > > > > will be enough for V1 and that, if there's > > sufficient > > > > > > demand > > > > > > > for > > > > > > > > > > it, > > > > > > > > > > > we > > > > > > > > > > > > > can > > > > > > > > > > > > > > introduce a richer API for resetting or even > > > modifying > > > > > > > connector > > > > > > > > > > > > offsets > > > > > > > > > > > > > in > > > > > > > > > > > > > > a follow-up KIP. > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Good eye :) I think it's fine to keep the > > existing > > > > > > > behavior > > > > > > > > > for > > > > > > > > > > > the > > > > > > > > > > > > > > PAUSED state with the Connector instance, since > the > > > > primary > > > > > > > > > purpose > > > > > > > > > > > of > > > > > > > > > > > > > the > > > > > > > > > > > > > > Connector is to generate task configs and monitor > > the > > > > > > > external > > > > > > > > > > system > > > > > > > > > > > > for > > > > > > > > > > > > > > changes. If there's no chance for tasks to be > > running > > > > > > > anyways, I > > > > > > > > > > > don't > > > > > > > > > > > > > see > > > > > > > > > > > > > > much value in allowing paused connectors to > > generate > > > > new > > > > > > task > > > > > > > > > > > configs, > > > > > > > > > > > > > > especially since each time that happens a > rebalance > > > is > > > > > > > triggered > > > > > > > > > > and > > > > > > > > > > > > > > there's a non-zero cost to that. What do you > think? > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin > > > > > > > > > > <apan...@confluent.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks for KIP Chris - I think this is a useful > > > > feature. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Can you please elaborate on the following in > the > > > KIP > > > > - > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 1. How would the response of GET > > > > > > > > > /connectors/{connector}/offsets > > > > > > > > > > > look > > > > > > > > > > > > > > like > > > > > > > > > > > > > > > if the worker has both global and connector > > > specific > > > > > > > offsets > > > > > > > > > > topic > > > > > > > > > > > ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 2. How can we pass the reset options like > > shift-by > > > , > > > > > > > > > to-date-time > > > > > > > > > > > > etc. > > > > > > > > > > > > > > > using a REST API like DELETE > > > > > > > /connectors/{connector}/offsets ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > 3. Today PAUSE operation on a connector invokes > > its > > > > stop > > > > > > > > > method - > > > > > > > > > > > > will > > > > > > > > > > > > > > > there be a change here to reduce confusion with > > the > > > > new > > > > > > > > > proposed > > > > > > > > > > > > > STOPPED > > > > > > > > > > > > > > > state ? > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Thanks, > > > > > > > > > > > > > > > Ashwin > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton > > > > > > > > > > > > <chr...@aiven.io.invalid > > > > > > > > > > > > > > > > > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I noticed a fairly large gap in the first > > version > > > > of > > > > > > > this KIP > > > > > > > > > > > that > > > > > > > > > > > > I > > > > > > > > > > > > > > > > published last Friday, which has to do with > > > > > > accommodating > > > > > > > > > > > > connectors > > > > > > > > > > > > > > > > that target different Kafka clusters than the > > one > > > > that > > > > > > > the > > > > > > > > > > Kafka > > > > > > > > > > > > > > Connect > > > > > > > > > > > > > > > > cluster uses for its internal topics and > source > > > > > > > connectors > > > > > > > > > with > > > > > > > > > > > > > > dedicated > > > > > > > > > > > > > > > > offsets topics. I've since updated the KIP to > > > > address > > > > > > > this > > > > > > > > > gap, > > > > > > > > > > > > which > > > > > > > > > > > > > > has > > > > > > > > > > > > > > > > substantially altered the design. Wanted to > > give > > > a > > > > > > > heads-up > > > > > > > > > to > > > > > > > > > > > > anyone > > > > > > > > > > > > > > > > that's already started reviewing. > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton > < > > > > > > > > > chr...@aiven.io> > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Hi all, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > I'd like to begin discussion on a KIP to > add > > > > offsets > > > > > > > > > support > > > > > > > > > > to > > > > > > > > > > > > the > > > > > > > > > > > > > > > Kafka > > > > > > > > > > > > > > > > > Connect REST API: > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Cheers, > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >