Hi Chris,

Thanks for clarifying, I had missed that update in the KIP (the bit about
altering/resetting offsets response). I think your arguments for not going
with an additional method or a custom return type make sense.

Thanks,
Yash

On Sat, Dec 10, 2022 at 12:28 AM Chris Egerton <chr...@aiven.io.invalid>
wrote:

> Hi Yash,
>
> The idea with the boolean is to just signify that a connector has
> overridden this method, which allows us to issue a definitive response in
> the REST API when servicing offset alter/reset requests (described more in
> detail here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Altering/resettingoffsets(response)
> ).
>
> One alternative could be to add some kind of OffsetAlterResult return type
> for this method with constructors like "OffsetAlterResult.success()",
> "OffsetAlterResult.unknown()" (default), and
> "OffsetAlterResult.failure(Throwable t)", but it's hard to envision a
> reason to use the "failure" static factory method instead of just throwing
> an exception, at which point, we're only left with two reasonable methods:
> success, and unknown.
>
> Another could be to add a "boolean canResetOffsets()" method to the
> SourceConnector class, but adding a separate method seems like overkill,
> and developers may not understand that implementing that method to always
> return "false" won't actually cause offset reset requests to not take
> place.
>
> One final option could be to have something like an AlterOffsetsSupport
> enum with values SUPPORTED and UNSUPPORTED and a new "AlterOffsetsSupport
> alterOffsetsSupport()" SourceConnector method that returns null by default
> (which implicitly maps to the "unknown support" response message in the
> REST API). This would line up with the ExactlyOnceSupport API we added in
> KIP-618. However, I'm hesitant to adopt it because it's not strictly
> necessary to address the cases that we want to cover; everything can be
> handled with a single method that gets invoked on offset alter/reset
> requests. With exactly-once support, we added this hook because it was
> designed to be invoked in a different point in the connector lifecycle.
>
> Cheers,
>
> Chris
>
> On Wed, Dec 7, 2022 at 9:46 AM Yash Mayya <yash.ma...@gmail.com> wrote:
>
> > Hi Chris,
> >
> > Sorry for the late reply.
> >
> > > I don't believe logging an error message is sufficient for
> > > handling failures to reset-after-delete. IMO it's highly
> > > likely that users will either shoot themselves in the foot
> > > by not reading the fine print and realizing that the offset
> > > request may have failed, or will ask for better visibility
> > > into the success or failure of the reset request than
> > > scanning log files.
> >
> > Your reasoning for deferring the reset offsets after delete functionality
> > to a separate KIP makes sense, thanks for the explanation.
> >
> > > I've updated the KIP with the
> > > developer-facing API changes for this logic
> >
> > This is great, I hadn't considered the other two (very valid) use-cases
> for
> > such an API, thanks for adding these with elaborate documentation!
> However,
> > the significance / use of the boolean value returned by the two methods
> is
> > not fully clear, could you please clarify?
> >
> > Thanks,
> > Yash
> >
> > On Fri, Nov 18, 2022 at 1:06 AM Chris Egerton <chr...@aiven.io.invalid>
> > wrote:
> >
> > > Hi Yash,
> > >
> > > I've updated the KIP with the correct "kafka_topic", "kafka_partition",
> > and
> > > "kafka_offset" keys in the JSON examples (settled on those instead of
> > > prefixing with "Kafka " for better interactions with tooling like JQ).
> > I've
> > > also added a note about sink offset requests failing if there are still
> > > active members in the consumer group.
> > >
> > > I don't believe logging an error message is sufficient for handling
> > > failures to reset-after-delete. IMO it's highly likely that users will
> > > either shoot themselves in the foot by not reading the fine print and
> > > realizing that the offset request may have failed, or will ask for
> better
> > > visibility into the success or failure of the reset request than
> scanning
> > > log files. I don't doubt that there are ways to address this, but I
> would
> > > prefer to leave them to a separate KIP since the required design work
> is
> > > non-trivial and I do not feel that the added burden is worth tying to
> > this
> > > KIP as a blocker.
> > >
> > > I was really hoping to avoid introducing a change to the
> developer-facing
> > > APIs with this KIP, but after giving it some thought I think this may
> be
> > > unavoidable. It's debatable whether validation of altered offsets is a
> > good
> > > enough use case on its own for this kind of API, but since there are
> also
> > > connectors out there that manage offsets externally, we should probably
> > add
> > > a hook to allow those external offsets to be managed, which can then
> > serve
> > > double- or even-triple duty as a hook to validate custom offsets and to
> > > notify users whether offset resets/alterations are supported at all
> > (which
> > > they may not be if, for example, offsets are coupled tightly with the
> > data
> > > written by a sink connector). I've updated the KIP with the
> > > developer-facing API changes for this logic; let me know what you
> think.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Mon, Nov 14, 2022 at 10:16 AM Mickael Maison <
> > mickael.mai...@gmail.com>
> > > wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > Thanks for the update!
> > > >
> > > > It's relatively common to only want to reset offsets for a specific
> > > > resource (for example with MirrorMaker for one or a group of topics).
> > > > Could it be possible to add a way to do so? Either by providing a
> > > > payload to DELETE or by setting the offset field to an empty object
> in
> > > > the PATCH payload?
> > > >
> > > > Thanks,
> > > > Mickael
> > > >
> > > > On Sat, Nov 12, 2022 at 3:33 PM Yash Mayya <yash.ma...@gmail.com>
> > wrote:
> > > > >
> > > > > Hi Chris,
> > > > >
> > > > > Thanks for pointing out that the consumer group deletion step
> itself
> > > will
> > > > > fail in case of zombie sink tasks. Since we can't get any stronger
> > > > > guarantees from consumers (unlike with transactional producers), I
> > > think
> > > > it
> > > > > makes perfect sense to fail the offset reset attempt in such
> > scenarios
> > > > with
> > > > > a relevant error message to the user. I was more concerned about
> > > silently
> > > > > failing but it looks like that won't be an issue. It's probably
> worth
> > > > > calling out this difference between source / sink connectors
> > explicitly
> > > > in
> > > > > the KIP, what do you think?
> > > > >
> > > > > > changing the field names for sink offsets
> > > > > > from "topic", "partition", and "offset" to "Kafka
> > > > > > topic", "Kafka partition", and "Kafka offset" respectively, to
> > > > > > reduce the stuttering effect of having a "partition" field inside
> > > > > >  a "partition" field and the same with an "offset" field
> > > > >
> > > > > The KIP is still using the nested partition / offset fields by the
> > way
> > > -
> > > > > has it not been updated because we're waiting for consensus on the
> > > field
> > > > > names?
> > > > >
> > > > > > The reset-after-delete feature, on the other
> > > > > > hand, is actually pretty tricky to design; I've updated the
> > > > > > rationale in the KIP for delaying it and clarified that it's not
> > > > > > just a matter of implementation but also design work.
> > > > >
> > > > > I like the idea of writing an offset reset request to the config
> > topic
> > > > > which will be processed by the herder's config update listener -
> I'm
> > > not
> > > > > sure I fully follow the concerns with regard to handling failures?
> > Why
> > > > > can't we simply log an error saying that the offset reset for the
> > > deleted
> > > > > connector "xyz" failed due to reason "abc"? As long as it's
> > documented
> > > > that
> > > > > connector deletion and offset resets are asynchronous and a success
> > > > > response only means that the request was initiated successfully
> > (which
> > > is
> > > > > the case even today with normal connector deletion), we should be
> > fine
> > > > > right?
> > > > >
> > > > > Thanks for adding the new PATCH endpoint to the KIP, I think it's a
> > lot
> > > > > more useful for this use case than a PUT endpoint would be! One
> thing
> > > > > that I was thinking about with the new PATCH endpoint is that while
> > we
> > > > can
> > > > > easily validate the request body format for sink connectors (since
> > it's
> > > > the
> > > > > same across all connectors), we can't do the same for source
> > connectors
> > > > as
> > > > > things stand today since each source connector implementation can
> > > define
> > > > > its own source partition and offset structures. Without any
> > validation,
> > > > > writing a bad offset for a source connector via the PATCH endpoint
> > > could
> > > > > cause it to fail with hard to discern errors. I'm wondering if we
> > could
> > > > add
> > > > > a new method to the `SourceConnector` class (which should be
> > overridden
> > > > by
> > > > > source connector implementations) that would validate whether or
> not
> > > the
> > > > > provided source partitions and source offsets are valid for the
> > > connector
> > > > > (it could have a default implementation returning true
> > unconditionally
> > > > for
> > > > > backward compatibility).
> > > > >
> > > > > > I've also added an implementation plan to the KIP, which calls
> > > > > > out the different parts that can be worked on independently so
> that
> > > > > > others (hi Yash 🙂) can also tackle parts of this if they'd like.
> > > > >
> > > > > I'd be more than happy to pick up one or more of the implementation
> > > > parts,
> > > > > thanks for breaking it up into granular pieces!
> > > > >
> > > > > Thanks,
> > > > > Yash
> > > > >
> > > > > On Fri, Nov 11, 2022 at 11:25 PM Chris Egerton
> > <chr...@aiven.io.invalid
> > > >
> > > > > wrote:
> > > > >
> > > > > > Hi Mickael,
> > > > > >
> > > > > > Thanks for your feedback. This has been on my TODO list as well
> :)
> > > > > >
> > > > > > 1. That's fair! Support for altering offsets is easy enough to
> > > design,
> > > > so
> > > > > > I've added it to the KIP. The reset-after-delete feature, on the
> > > other
> > > > > > hand, is actually pretty tricky to design; I've updated the
> > rationale
> > > > in
> > > > > > the KIP for delaying it and clarified that it's not just a matter
> > of
> > > > > > implementation but also design work. If you or anyone else can
> > think
> > > > of a
> > > > > > clean, simple way to implement it, I'm happy to add it to this
> KIP,
> > > but
> > > > > > otherwise I'd prefer not to tie it to the approval and release of
> > the
> > > > > > features already proposed in the KIP.
> > > > > >
> > > > > > 2. Yeah, it's a little awkward. In my head I've justified the
> > > ugliness
> > > > of
> > > > > > the implementation with the smooth user-facing experience;
> falling
> > > back
> > > > > > seamlessly on the PAUSED state without even logging an error
> > message
> > > > is a
> > > > > > lot better than I'd initially hoped for when I was designing this
> > > > feature.
> > > > > >
> > > > > > I've also added an implementation plan to the KIP, which calls
> out
> > > the
> > > > > > different parts that can be worked on independently so that
> others
> > > (hi
> > > > Yash
> > > > > > 🙂) can also tackle parts of this if they'd like.
> > > > > >
> > > > > > Finally, I've removed the "type" field from the response body
> > format
> > > > for
> > > > > > offset read requests. This way, users can copy+paste the response
> > > from
> > > > that
> > > > > > endpoint into a request to alter a connector's offsets without
> > having
> > > > to
> > > > > > remove the "type" field first. An alternative was to keep the
> > "type"
> > > > field
> > > > > > and add it to the request body format for altering offsets, but
> > this
> > > > didn't
> > > > > > seem to make enough sense for cases not involving the
> > aforementioned
> > > > > > copy+paste process.
> > > > > >
> > > > > > Cheers,
> > > > > >
> > > > > > Chris
> > > > > >
> > > > > > On Wed, Nov 9, 2022 at 9:57 AM Mickael Maison <
> > > > mickael.mai...@gmail.com>
> > > > > > wrote:
> > > > > >
> > > > > > > Hi Chris,
> > > > > > >
> > > > > > > Thanks for the KIP, you're picking something that has been in
> my
> > > todo
> > > > > > > list for a while ;)
> > > > > > >
> > > > > > > It looks good overall, I just have a couple of questions:
> > > > > > > 1) I consider both features listed in Future Work pretty
> > important.
> > > > In
> > > > > > > both cases you mention the reason for not addressing them now
> is
> > > > > > > because of the implementation. If the design is simple and if
> we
> > > have
> > > > > > > volunteers to implement them, I wonder if we could include them
> > in
> > > > > > > this KIP. So you would not have to implement everything but we
> > > would
> > > > > > > have a single KIP and vote.
> > > > > > >
> > > > > > > 2) Regarding the backward compatibility for the stopped state.
> > The
> > > > > > > "state.v2" field is a bit unfortunate but I can't think of a
> > better
> > > > > > > solution. The other alternative would be to not do anything
> but I
> > > > > > > think the graceful degradation you propose is a bit better.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Mickael
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > >
> > > > > > > On Tue, Nov 8, 2022 at 5:58 PM Chris Egerton
> > > <chr...@aiven.io.invalid
> > > > >
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > Hi Yash,
> > > > > > > >
> > > > > > > > Good question! This is actually a subtle source of asymmetry
> in
> > > the
> > > > > > > current
> > > > > > > > proposal. Requests to delete a consumer group with active
> > members
> > > > will
> > > > > > > > fail, so if there are zombie sink tasks that are still
> > > > communicating
> > > > > > with
> > > > > > > > Kafka, offset reset requests for that connector will also
> fail.
> > > It
> > > > is
> > > > > > > > possible to use an admin client to remove all active members
> > from
> > > > the
> > > > > > > group
> > > > > > > > and then delete the group. However, this solution isn't as
> > > > complete as
> > > > > > > the
> > > > > > > > zombie fencing that we can perform for exactly-once source
> > tasks,
> > > > since
> > > > > > > > removing consumers from a group doesn't prevent them from
> > > > immediately
> > > > > > > > rejoining the group, which would either cause the group
> > deletion
> > > > > > request
> > > > > > > to
> > > > > > > > fail (if they rejoin before the group is deleted), or
> recreate
> > > the
> > > > > > group
> > > > > > > > (if they rejoin after the group is deleted).
> > > > > > > >
> > > > > > > > For ease of implementation, I'd prefer to leave the asymmetry
> > in
> > > > the
> > > > > > API
> > > > > > > > for now and fail fast and clearly if there are still
> consumers
> > > > active
> > > > > > in
> > > > > > > > the sink connector's group. We can try to detect this case
> and
> > > > provide
> > > > > > a
> > > > > > > > helpful error message to the user explaining why the offset
> > reset
> > > > > > request
> > > > > > > > has failed and some steps they can take to try to resolve
> > things
> > > > (wait
> > > > > > > for
> > > > > > > > slow task shutdown to complete, restart zombie workers and/or
> > > > workers
> > > > > > > with
> > > > > > > > blocked tasks on them). In the future we can possibly even
> > > revisit
> > > > > > > KIP-611
> > > > > > > > [1] or something like it to provide better insight into
> zombie
> > > > tasks
> > > > > > on a
> > > > > > > > worker so that it's easier to find which tasks have been
> > > abandoned
> > > > but
> > > > > > > are
> > > > > > > > still running.
> > > > > > > >
> > > > > > > > Let me know what you think; this is an important point to
> call
> > > out
> > > > and
> > > > > > if
> > > > > > > > we can reach some consensus on how to handle sink connector
> > > offset
> > > > > > resets
> > > > > > > > w/r/t zombie tasks, I'll update the KIP with the details.
> > > > > > > >
> > > > > > > > [1] -
> > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Chris
> > > > > > > >
> > > > > > > > On Tue, Nov 8, 2022 at 8:00 AM Yash Mayya <
> > yash.ma...@gmail.com>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Chris,
> > > > > > > > >
> > > > > > > > > Thanks for the response and the explanations, I think
> you've
> > > > answered
> > > > > > > > > pretty much all the questions I had meticulously!
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > if something goes wrong while resetting offsets, there's
> no
> > > > > > > > > > immediate impact--the connector will still be in the
> > STOPPED
> > > > > > > > > >  state. The REST response for requests to reset the
> offsets
> > > > > > > > > > will clearly call out that the operation has failed, and
> if
> > > > > > > necessary,
> > > > > > > > > > we can probably also add a scary-looking warning message
> > > > > > > > > > stating that we can't guarantee which offsets have been
> > > > > > successfully
> > > > > > > > > >  wiped and which haven't. Users can query the exact
> offsets
> > > of
> > > > > > > > > > the connector at this point to determine what will happen
> > > > if/what
> > > > > > > they
> > > > > > > > > > resume it. And they can repeat attempts to reset the
> > offsets
> > > as
> > > > > > many
> > > > > > > > > >  times as they'd like until they get back a 2XX response,
> > > > > > indicating
> > > > > > > > > > that it's finally safe to resume the connector. Thoughts?
> > > > > > > > >
> > > > > > > > > Yeah, I agree, the case that I mentioned earlier where a
> user
> > > > would
> > > > > > > try to
> > > > > > > > > resume a stopped connector after a failed offset reset
> > attempt
> > > > > > without
> > > > > > > > > knowing that the offset reset attempt didn't fail cleanly
> is
> > > > probably
> > > > > > > just
> > > > > > > > > an extreme edge case. I think as long as the response is
> > > verbose
> > > > > > > enough and
> > > > > > > > > self explanatory, we should be fine.
> > > > > > > > >
> > > > > > > > > Another question that I had was behavior w.r.t sink
> connector
> > > > offset
> > > > > > > resets
> > > > > > > > > when there are zombie tasks/workers in the Connect cluster
> -
> > > the
> > > > KIP
> > > > > > > > > mentions that for sink connectors offset resets will be
> done
> > by
> > > > > > > deleting
> > > > > > > > > the consumer group. However, if there are zombie tasks
> which
> > > are
> > > > > > still
> > > > > > > able
> > > > > > > > > to communicate with the Kafka cluster that the sink
> connector
> > > is
> > > > > > > consuming
> > > > > > > > > from, I think the consumer group will automatically get
> > > > re-created
> > > > > > and
> > > > > > > the
> > > > > > > > > zombie task may be able to commit offsets for the
> partitions
> > > > that it
> > > > > > is
> > > > > > > > > consuming from?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Yash
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Nov 4, 2022 at 10:27 PM Chris Egerton
> > > > > > <chr...@aiven.io.invalid
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Yash,
> > > > > > > > > >
> > > > > > > > > > Thanks again for your thoughts! Responses to ongoing
> > > > discussions
> > > > > > > inline
> > > > > > > > > > (easier to track context than referencing comment
> numbers):
> > > > > > > > > >
> > > > > > > > > > > However, this then leads me to wonder if we can make
> that
> > > > > > explicit
> > > > > > > by
> > > > > > > > > > including "connect" or "connector" in the higher level
> > field
> > > > names?
> > > > > > > Or do
> > > > > > > > > > you think this isn't required given that we're talking
> > about
> > > a
> > > > > > > Connect
> > > > > > > > > > specific REST API in the first place?
> > > > > > > > > >
> > > > > > > > > > I think "partition" and "offset" are fine as field names
> > but
> > > > I'm
> > > > > > not
> > > > > > > > > hugely
> > > > > > > > > > opposed to adding "connector " as a prefix to them; would
> > be
> > > > > > > interested
> > > > > > > > > in
> > > > > > > > > > others' thoughts.
> > > > > > > > > >
> > > > > > > > > > > I'm not sure I followed why the unresolved writes to
> the
> > > > config
> > > > > > > topic
> > > > > > > > > > would be an issue - wouldn't the delete offsets request
> be
> > > > added to
> > > > > > > the
> > > > > > > > > > herder's request queue and whenever it is processed, we'd
> > > > anyway
> > > > > > > need to
> > > > > > > > > > check if all the prerequisites for the request are
> > satisfied?
> > > > > > > > > >
> > > > > > > > > > Some requests are handled in multiple steps. For example,
> > > > deleting
> > > > > > a
> > > > > > > > > > connector (1) adds a request to the herder queue to
> write a
> > > > > > > tombstone to
> > > > > > > > > > the config topic (or, if the worker isn't the leader,
> > forward
> > > > the
> > > > > > > request
> > > > > > > > > > to the leader). (2) Once that tombstone is picked up,
> (3) a
> > > > > > rebalance
> > > > > > > > > > ensues, and then after it's finally complete, (4) the
> > > > connector and
> > > > > > > its
> > > > > > > > > > tasks are shut down. I probably could have used better
> > > > terminology,
> > > > > > > but
> > > > > > > > > > what I meant by "unresolved writes to the config topic"
> > was a
> > > > case
> > > > > > in
> > > > > > > > > > between steps (2) and (3)--where the worker has already
> > read
> > > > that
> > > > > > > > > tombstone
> > > > > > > > > > from the config topic and knows that a rebalance is
> > pending,
> > > > but
> > > > > > > hasn't
> > > > > > > > > > begun participating in that rebalance yet. In the
> > > > DistributedHerder
> > > > > > > > > class,
> > > > > > > > > > this is done via the `checkRebalanceNeeded` method.
> > > > > > > > > >
> > > > > > > > > > > We can probably revisit this potential deprecation [of
> > the
> > > > PAUSED
> > > > > > > > > state]
> > > > > > > > > > in the future based on user feedback and how the adoption
> > of
> > > > the
> > > > > > new
> > > > > > > > > > proposed stop endpoint looks like, what do you think?
> > > > > > > > > >
> > > > > > > > > > Yeah, revisiting in the future seems reasonable. 👍
> > > > > > > > > >
> > > > > > > > > > And responses to new comments here:
> > > > > > > > > >
> > > > > > > > > > 8. Yep, we'll start tracking offsets by connector. I
> don't
> > > > believe
> > > > > > > this
> > > > > > > > > > should be too difficult, and suspect that the only reason
> > we
> > > > track
> > > > > > > raw
> > > > > > > > > byte
> > > > > > > > > > arrays instead of pre-deserializing offset topic
> > information
> > > > into
> > > > > > > > > something
> > > > > > > > > > more useful is because Connect originally had pluggable
> > > > internal
> > > > > > > > > > converters. Now that we're hardcoded to use the JSON
> > > converter
> > > > it
> > > > > > > should
> > > > > > > > > be
> > > > > > > > > > fine to track offsets on a per-connector basis as they're
> > > read
> > > > from
> > > > > > > the
> > > > > > > > > > offsets topic.
> > > > > > > > > >
> > > > > > > > > > 9. I'm hesitant to introduce this type of feature right
> now
> > > > because
> > > > > > > of
> > > > > > > > > all
> > > > > > > > > > of the gotchas that would come with it. In
> > security-conscious
> > > > > > > > > environments,
> > > > > > > > > > it's possible that a sink connector's principal may have
> > > > access to
> > > > > > > the
> > > > > > > > > > consumer group used by the connector, but the worker's
> > > > principal
> > > > > > may
> > > > > > > not.
> > > > > > > > > > There's also the case where source connectors have
> separate
> > > > offsets
> > > > > > > > > topics,
> > > > > > > > > > or sink connectors have overridden consumer group IDs, or
> > > sink
> > > > or
> > > > > > > source
> > > > > > > > > > connectors work against a different Kafka cluster than
> the
> > > one
> > > > that
> > > > > > > their
> > > > > > > > > > worker uses. Overall, I'd rather provide a single API
> that
> > > > works in
> > > > > > > all
> > > > > > > > > > cases rather than risk confusing and alienating users by
> > > > trying to
> > > > > > > make
> > > > > > > > > > their lives easier in a subset of cases.
> > > > > > > > > >
> > > > > > > > > > 10. Hmm... I don't think the order of the writes matters
> > too
> > > > much
> > > > > > > here,
> > > > > > > > > but
> > > > > > > > > > we probably could start by deleting from the global topic
> > > > first,
> > > > > > > that's
> > > > > > > > > > true. The reason I'm not hugely concerned about this case
> > is
> > > > that
> > > > > > if
> > > > > > > > > > something goes wrong while resetting offsets, there's no
> > > > immediate
> > > > > > > > > > impact--the connector will still be in the STOPPED state.
> > The
> > > > REST
> > > > > > > > > response
> > > > > > > > > > for requests to reset the offsets will clearly call out
> > that
> > > > the
> > > > > > > > > operation
> > > > > > > > > > has failed, and if necessary, we can probably also add a
> > > > > > > scary-looking
> > > > > > > > > > warning message stating that we can't guarantee which
> > offsets
> > > > have
> > > > > > > been
> > > > > > > > > > successfully wiped and which haven't. Users can query the
> > > exact
> > > > > > > offsets
> > > > > > > > > of
> > > > > > > > > > the connector at this point to determine what will happen
> > > > if/what
> > > > > > > they
> > > > > > > > > > resume it. And they can repeat attempts to reset the
> > offsets
> > > as
> > > > > > many
> > > > > > > > > times
> > > > > > > > > > as they'd like until they get back a 2XX response,
> > indicating
> > > > that
> > > > > > > it's
> > > > > > > > > > finally safe to resume the connector. Thoughts?
> > > > > > > > > >
> > > > > > > > > > 11. I haven't thought too much about it. I think
> something
> > > > like the
> > > > > > > > > > Monitorable* connectors would probably serve our needs
> > here;
> > > > we can
> > > > > > > > > > instantiate them on a running Connect cluster and then
> use
> > > > various
> > > > > > > > > handles
> > > > > > > > > > to know how many times they've been polled, committed
> > > records,
> > > > etc.
> > > > > > > If
> > > > > > > > > > necessary we can tweak those classes or even write our
> own.
> > > But
> > > > > > > anyways,
> > > > > > > > > > once that's all done, the test will be something like
> > > "create a
> > > > > > > > > connector,
> > > > > > > > > > wait for it to produce N records (each of which contains
> > some
> > > > kind
> > > > > > of
> > > > > > > > > > predictable offset), and ensure that the offsets for it
> in
> > > the
> > > > REST
> > > > > > > API
> > > > > > > > > > match up with the ones we'd expect from N records". Does
> > that
> > > > > > answer
> > > > > > > your
> > > > > > > > > > question?
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > >
> > > > > > > > > > Chris
> > > > > > > > > >
> > > > > > > > > > On Tue, Oct 18, 2022 at 3:28 AM Yash Mayya <
> > > > yash.ma...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Chris,
> > > > > > > > > > >
> > > > > > > > > > > 1. Thanks a lot for elaborating on this, I'm now
> > convinced
> > > > about
> > > > > > > the
> > > > > > > > > > > usefulness of the new offset reset endpoint. Regarding
> > the
> > > > > > > follow-up
> > > > > > > > > KIP
> > > > > > > > > > > for a fine-grained offset write API, I'd be happy to
> take
> > > > that on
> > > > > > > once
> > > > > > > > > > this
> > > > > > > > > > > KIP is finalized and I will definitely look forward to
> > your
> > > > > > > feedback on
> > > > > > > > > > > that one!
> > > > > > > > > > >
> > > > > > > > > > > 2. Gotcha, the motivation makes more sense to me now.
> So
> > > the
> > > > > > higher
> > > > > > > > > level
> > > > > > > > > > > partition field represents a Connect specific "logical
> > > > partition"
> > > > > > > of
> > > > > > > > > > sorts
> > > > > > > > > > > - i.e. the source partition as defined by a connector
> for
> > > > source
> > > > > > > > > > connectors
> > > > > > > > > > > and a Kafka topic + partition for sink connectors. I
> like
> > > the
> > > > > > idea
> > > > > > > of
> > > > > > > > > > > adding a Kafka prefix to the lower level
> partition/offset
> > > > (and
> > > > > > > topic)
> > > > > > > > > > > fields which basically makes it more clear (although
> > > > implicitly)
> > > > > > > that
> > > > > > > > > the
> > > > > > > > > > > higher level partition/offset field is Connect specific
> > and
> > > > not
> > > > > > the
> > > > > > > > > same
> > > > > > > > > > as
> > > > > > > > > > > what those terms represent in Kafka itself. However,
> this
> > > > then
> > > > > > > leads me
> > > > > > > > > > to
> > > > > > > > > > > wonder if we can make that explicit by including
> > "connect"
> > > or
> > > > > > > > > "connector"
> > > > > > > > > > > in the higher level field names? Or do you think this
> > isn't
> > > > > > > required
> > > > > > > > > > given
> > > > > > > > > > > that we're talking about a Connect specific REST API in
> > the
> > > > first
> > > > > > > > > place?
> > > > > > > > > > >
> > > > > > > > > > > 3. Thanks, I think the response structure definitely
> > looks
> > > > better
> > > > > > > now!
> > > > > > > > > > >
> > > > > > > > > > > 4. Interesting, I'd be curious to learn why we might
> want
> > > to
> > > > > > change
> > > > > > > > > this
> > > > > > > > > > in
> > > > > > > > > > > the future but that's probably out of scope for this
> > > > discussion.
> > > > > > > I'm
> > > > > > > > > not
> > > > > > > > > > > sure I followed why the unresolved writes to the config
> > > topic
> > > > > > > would be
> > > > > > > > > an
> > > > > > > > > > > issue - wouldn't the delete offsets request be added to
> > the
> > > > > > > herder's
> > > > > > > > > > > request queue and whenever it is processed, we'd anyway
> > > need
> > > > to
> > > > > > > check
> > > > > > > > > if
> > > > > > > > > > > all the prerequisites for the request are satisfied?
> > > > > > > > > > >
> > > > > > > > > > > 5. Thanks for elaborating that just fencing out the
> > > producer
> > > > > > still
> > > > > > > > > leaves
> > > > > > > > > > > many cases where source tasks remain hanging around and
> > > also
> > > > that
> > > > > > > we
> > > > > > > > > > anyway
> > > > > > > > > > > can't have similar data production guarantees for sink
> > > > connectors
> > > > > > > right
> > > > > > > > > > > now. I agree that it might be better to go with ease of
> > > > > > > implementation
> > > > > > > > > > and
> > > > > > > > > > > consistency for now.
> > > > > > > > > > >
> > > > > > > > > > > 6. Right, that does make sense but I still feel like
> the
> > > two
> > > > > > states
> > > > > > > > > will
> > > > > > > > > > > end up being confusing to end users who might not be
> able
> > > to
> > > > > > > discern
> > > > > > > > > the
> > > > > > > > > > > (fairly low-level) differences between them (also the
> > > > nuances of
> > > > > > > state
> > > > > > > > > > > transitions like STOPPED -> PAUSED or PAUSED -> STOPPED
> > > with
> > > > the
> > > > > > > > > > > rebalancing implications as well). We can probably
> > revisit
> > > > this
> > > > > > > > > potential
> > > > > > > > > > > deprecation in the future based on user feedback and
> how
> > > the
> > > > > > > adoption
> > > > > > > > > of
> > > > > > > > > > > the new proposed stop endpoint looks like, what do you
> > > think?
> > > > > > > > > > >
> > > > > > > > > > > 7. Aha, that is completely my bad, I missed that the
> > v1/v2
> > > > state
> > > > > > is
> > > > > > > > > only
> > > > > > > > > > > applicable to the connector's target state and that we
> > > don't
> > > > need
> > > > > > > to
> > > > > > > > > > worry
> > > > > > > > > > > about the tasks since we will have an empty set of
> > tasks. I
> > > > > > think I
> > > > > > > > > was a
> > > > > > > > > > > little confused by "pause the parts of the connector
> that
> > > > they
> > > > > > are
> > > > > > > > > > > assigned" from the KIP. Thanks for clarifying that!
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Some more thoughts and questions that I had -
> > > > > > > > > > >
> > > > > > > > > > > 8. Could you elaborate on what the implementation for
> > > offset
> > > > > > reset
> > > > > > > for
> > > > > > > > > > > source connectors would look like? Currently, it
> doesn't
> > > look
> > > > > > like
> > > > > > > we
> > > > > > > > > > track
> > > > > > > > > > > all the partitions for a source connector anywhere.
> Will
> > we
> > > > need
> > > > > > to
> > > > > > > > > > > book-keep this somewhere in order to be able to emit a
> > > > tombstone
> > > > > > > record
> > > > > > > > > > for
> > > > > > > > > > > each source partition?
> > > > > > > > > > >
> > > > > > > > > > > 9. The KIP describes the offset reset endpoint as only
> > > being
> > > > > > > usable on
> > > > > > > > > > > existing connectors that are in a `STOPPED` state. Why
> > > > wouldn't
> > > > > > we
> > > > > > > want
> > > > > > > > > > to
> > > > > > > > > > > allow resetting offsets for a deleted connector which
> > seems
> > > > to
> > > > > > be a
> > > > > > > > > valid
> > > > > > > > > > > use case? Or do we plan to handle this use case only
> via
> > > the
> > > > item
> > > > > > > > > > outlined
> > > > > > > > > > > in the future work section - "Automatically delete
> > offsets
> > > > with
> > > > > > > > > > > connectors"?
> > > > > > > > > > >
> > > > > > > > > > > 10. The KIP mentions that source offsets will be reset
> > > > > > > transactionally
> > > > > > > > > > for
> > > > > > > > > > > each topic (worker global offset topic and connector
> > > specific
> > > > > > > offset
> > > > > > > > > > topic
> > > > > > > > > > > if it exists). While it obviously isn't possible to
> > > > atomically do
> > > > > > > the
> > > > > > > > > > > writes to two topics which may be on different Kafka
> > > > clusters,
> > > > > > I'm
> > > > > > > > > > > wondering about what would happen if the first
> > transaction
> > > > > > > succeeds but
> > > > > > > > > > the
> > > > > > > > > > > second one fails. I think the order of the two
> > transactions
> > > > > > matters
> > > > > > > > > here
> > > > > > > > > > -
> > > > > > > > > > > if we successfully emit tombstones to the connector
> > > specific
> > > > > > offset
> > > > > > > > > topic
> > > > > > > > > > > and fail to do so for the worker global offset topic,
> > we'll
> > > > > > > presumably
> > > > > > > > > > fail
> > > > > > > > > > > the offset delete request because the KIP mentions that
> > "A
> > > > > > request
> > > > > > > to
> > > > > > > > > > reset
> > > > > > > > > > > offsets for a source connector will only be considered
> > > > successful
> > > > > > > if
> > > > > > > > > the
> > > > > > > > > > > worker is able to delete all known offsets for that
> > > > connector, on
> > > > > > > both
> > > > > > > > > > the
> > > > > > > > > > > worker's global offsets topic and (if one is used) the
> > > > > > connector's
> > > > > > > > > > > dedicated offsets topic.". However, this will lead to
> the
> > > > > > connector
> > > > > > > > > only
> > > > > > > > > > > being able to read potentially older offsets from the
> > > worker
> > > > > > global
> > > > > > > > > > offset
> > > > > > > > > > > topic on resumption (based on the combined offset view
> > > > presented
> > > > > > as
> > > > > > > > > > > described in KIP-618 [1]). So, I think we should make
> > sure
> > > > that
> > > > > > the
> > > > > > > > > > worker
> > > > > > > > > > > global offset topic tombstoning is attempted first,
> > right?
> > > > Note
> > > > > > > that in
> > > > > > > > > > the
> > > > > > > > > > > current implementation of
> > > > `ConnectorOffsetBackingStore::set`, the
> > > > > > > > > > primary /
> > > > > > > > > > > connector specific offset store is written to first.
> > > > > > > > > > >
> > > > > > > > > > > 11. This probably isn't necessary to elaborate on in
> the
> > > KIP
> > > > > > > itself,
> > > > > > > > > but
> > > > > > > > > > I
> > > > > > > > > > > was wondering what the second offset test - "verify
> that
> > > that
> > > > > > those
> > > > > > > > > > offsets
> > > > > > > > > > > reflect an expected level of progress for each
> connector
> > > > (i.e.,
> > > > > > > they
> > > > > > > > > are
> > > > > > > > > > > greater than or equal to a certain value depending on
> how
> > > the
> > > > > > > > > connectors
> > > > > > > > > > > are configured and how long they have been running)" -
> > > would
> > > > look
> > > > > > > like?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Yash
> > > > > > > > > > >
> > > > > > > > > > > [1] -
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton
> > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for your detailed thoughts.
> > > > > > > > > > > >
> > > > > > > > > > > > 1. In KAFKA-4107 [1], the primary request is exactly
> > > what's
> > > > > > > proposed
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > > KIP right now: a way to reset offsets for connectors.
> > > Sure,
> > > > > > > there's
> > > > > > > > > an
> > > > > > > > > > > > extra step of stopping the connector, but renaming a
> > > > connector
> > > > > > > isn't
> > > > > > > > > as
> > > > > > > > > > > > convenient of an alternative as it may seem since in
> > many
> > > > cases
> > > > > > > you'd
> > > > > > > > > > > also
> > > > > > > > > > > > want to delete the older one, so the complete
> sequence
> > of
> > > > steps
> > > > > > > would
> > > > > > > > > > be
> > > > > > > > > > > > something like delete the old connector, rename it
> > > > (possibly
> > > > > > > > > requiring
> > > > > > > > > > > > modifications to its config file, depending on which
> > API
> > > is
> > > > > > > used),
> > > > > > > > > then
> > > > > > > > > > > > create the renamed variant. It's also just not a
> great
> > > user
> > > > > > > > > > > > experience--even if the practical impacts are limited
> > > > (which,
> > > > > > > IMO,
> > > > > > > > > they
> > > > > > > > > > > are
> > > > > > > > > > > > not), people have been asking for years about why
> they
> > > > have to
> > > > > > > employ
> > > > > > > > > > > this
> > > > > > > > > > > > kind of a workaround for a fairly common use case,
> and
> > we
> > > > don't
> > > > > > > > > really
> > > > > > > > > > > have
> > > > > > > > > > > > a good answer beyond "we haven't implemented
> something
> > > > better
> > > > > > > yet".
> > > > > > > > > On
> > > > > > > > > > > top
> > > > > > > > > > > > of that, you may have external tooling that needs to
> be
> > > > tweaked
> > > > > > > to
> > > > > > > > > > > handle a
> > > > > > > > > > > > new connector name, you may have strict authorization
> > > > policies
> > > > > > > around
> > > > > > > > > > who
> > > > > > > > > > > > can access what connectors, you may have other ACLs
> > > > attached to
> > > > > > > the
> > > > > > > > > > name
> > > > > > > > > > > of
> > > > > > > > > > > > the connector (which can be especially common in the
> > case
> > > > of
> > > > > > sink
> > > > > > > > > > > > connectors, whose consumer group IDs are tied to
> their
> > > > names by
> > > > > > > > > > default),
> > > > > > > > > > > > and leaving around state in the offsets topic that
> can
> > > > never be
> > > > > > > > > cleaned
> > > > > > > > > > > up
> > > > > > > > > > > > presents a bit of a footgun for users. It may not be
> a
> > > > silver
> > > > > > > bullet,
> > > > > > > > > > but
> > > > > > > > > > > > providing some mechanism to reset that state is a
> step
> > in
> > > > the
> > > > > > > right
> > > > > > > > > > > > direction and allows responsible users to more
> > carefully
> > > > > > > administer
> > > > > > > > > > their
> > > > > > > > > > > > cluster without resorting to non-public APIs. That
> > said,
> > > I
> > > > do
> > > > > > > agree
> > > > > > > > > > that
> > > > > > > > > > > a
> > > > > > > > > > > > fine-grained reset/overwrite API would be useful, and
> > I'd
> > > > be
> > > > > > > happy to
> > > > > > > > > > > > review a KIP to add that feature if anyone wants to
> > > tackle
> > > > it!
> > > > > > > > > > > >
> > > > > > > > > > > > 2. Keeping the two formats symmetrical is motivated
> > > mostly
> > > > by
> > > > > > > > > > aesthetics
> > > > > > > > > > > > and quality-of-life for programmatic interaction with
> > the
> > > > API;
> > > > > > > it's
> > > > > > > > > not
> > > > > > > > > > > > really a goal to hide the use of consumer groups from
> > > > users. I
> > > > > > do
> > > > > > > > > agree
> > > > > > > > > > > > that the format is a little strange-looking for sink
> > > > > > connectors,
> > > > > > > but
> > > > > > > > > it
> > > > > > > > > > > > seemed like it would be easier to work with for UIs,
> > > > casual jq
> > > > > > > > > queries,
> > > > > > > > > > > and
> > > > > > > > > > > > CLIs than a more Kafka-specific alternative such as
> > > > > > > > > > > {"<topic>-<partition>":
> > > > > > > > > > > > "<offset>"}, and although it is a little strange, I
> > don't
> > > > think
> > > > > > > it's
> > > > > > > > > > any
> > > > > > > > > > > > less readable or intuitive. That said, I've made some
> > > > tweaks to
> > > > > > > the
> > > > > > > > > > > format
> > > > > > > > > > > > that should make programmatic access even easier;
> > > > specifically,
> > > > > > > I've
> > > > > > > > > > > > removed the "source" and "sink" wrapper fields and
> > > instead
> > > > > > moved
> > > > > > > them
> > > > > > > > > > > into
> > > > > > > > > > > > a top-level object with a "type" and "offsets" field,
> > > just
> > > > like
> > > > > > > you
> > > > > > > > > > > > suggested in point 3 (thanks!). We might also
> consider
> > > > changing
> > > > > > > the
> > > > > > > > > > field
> > > > > > > > > > > > names for sink offsets from "topic", "partition", and
> > > > "offset"
> > > > > > to
> > > > > > > > > > "Kafka
> > > > > > > > > > > > topic", "Kafka partition", and "Kafka offset"
> > > > respectively, to
> > > > > > > reduce
> > > > > > > > > > the
> > > > > > > > > > > > stuttering effect of having a "partition" field
> inside
> > a
> > > > > > > "partition"
> > > > > > > > > > > field
> > > > > > > > > > > > and the same with an "offset" field; thoughts? One
> > final
> > > > > > > point--by
> > > > > > > > > > > equating
> > > > > > > > > > > > source and sink offsets, we probably make it easier
> for
> > > > users
> > > > > > to
> > > > > > > > > > > understand
> > > > > > > > > > > > exactly what a source offset is; anyone who's
> familiar
> > > with
> > > > > > > consumer
> > > > > > > > > > > > offsets can see from the response format that we
> > > identify a
> > > > > > > logical
> > > > > > > > > > > > partition as a combination of two entities (a topic
> > and a
> > > > > > > partition
> > > > > > > > > > > > number); it should make it easier to grok what a
> source
> > > > offset
> > > > > > > is by
> > > > > > > > > > > seeing
> > > > > > > > > > > > what the two formats have in common.
> > > > > > > > > > > >
> > > > > > > > > > > > 3. Great idea! Done.
> > > > > > > > > > > >
> > > > > > > > > > > > 4. Yes, I'm thinking right now that a 409 will be the
> > > > response
> > > > > > > status
> > > > > > > > > > if
> > > > > > > > > > > a
> > > > > > > > > > > > rebalance is pending. I'd rather not add this to the
> > KIP
> > > > as we
> > > > > > > may
> > > > > > > > > want
> > > > > > > > > > > to
> > > > > > > > > > > > change it at some point and it doesn't seem vital to
> > > > establish
> > > > > > > it as
> > > > > > > > > > part
> > > > > > > > > > > > of the public contract for the new endpoint right
> now.
> > > > Also,
> > > > > > > small
> > > > > > > > > > > > point--yes, a 409 is useful to avoid forwarding
> > requests
> > > > to an
> > > > > > > > > > incorrect
> > > > > > > > > > > > leader, but it's also useful to ensure that there
> > aren't
> > > > any
> > > > > > > > > unresolved
> > > > > > > > > > > > writes to the config topic that might cause issues
> with
> > > the
> > > > > > > request
> > > > > > > > > > (such
> > > > > > > > > > > > as deleting the connector).
> > > > > > > > > > > >
> > > > > > > > > > > > 5. That's a good point--it may be misleading to call
> a
> > > > > > connector
> > > > > > > > > > STOPPED
> > > > > > > > > > > > when it has zombie tasks lying around on the
> cluster. I
> > > > don't
> > > > > > > think
> > > > > > > > > > it'd
> > > > > > > > > > > be
> > > > > > > > > > > > appropriate to do this synchronously while handling
> > > > requests to
> > > > > > > the
> > > > > > > > > PUT
> > > > > > > > > > > > /connectors/{connector}/stop since we'd want to give
> > all
> > > > > > > > > > > currently-running
> > > > > > > > > > > > tasks a chance to gracefully shut down, though. I'm
> > also
> > > > not
> > > > > > sure
> > > > > > > > > that
> > > > > > > > > > > this
> > > > > > > > > > > > is a significant problem, either. If the connector is
> > > > resumed,
> > > > > > > then
> > > > > > > > > all
> > > > > > > > > > > > zombie tasks will be automatically fenced out by
> their
> > > > > > > successors on
> > > > > > > > > > > > startup; if it's deleted, then we'll have wasted
> effort
> > > by
> > > > > > > performing
> > > > > > > > > > an
> > > > > > > > > > > > unnecessary round of fencing. It may be nice to
> > guarantee
> > > > that
> > > > > > > source
> > > > > > > > > > > task
> > > > > > > > > > > > resources will be deallocated after the connector
> > > > transitions
> > > > > > to
> > > > > > > > > > STOPPED,
> > > > > > > > > > > > but realistically, it doesn't do much to just fence
> out
> > > > their
> > > > > > > > > > producers,
> > > > > > > > > > > > since tasks can be blocked on a number of other
> > > operations
> > > > such
> > > > > > > as
> > > > > > > > > > > > key/value/header conversion, transformation, and task
> > > > polling.
> > > > > > > It may
> > > > > > > > > > be
> > > > > > > > > > > a
> > > > > > > > > > > > little strange if data is produced to Kafka after the
> > > > connector
> > > > > > > has
> > > > > > > > > > > > transitioned to STOPPED, but we can't provide the
> same
> > > > > > > guarantees for
> > > > > > > > > > > sink
> > > > > > > > > > > > connectors, since their tasks may be stuck on a
> > > > long-running
> > > > > > > > > > > SinkTask::put
> > > > > > > > > > > > that emits data even after the Connect framework has
> > > > abandoned
> > > > > > > them
> > > > > > > > > > after
> > > > > > > > > > > > exhausting their graceful shutdown timeout.
> Ultimately,
> > > I'd
> > > > > > > prefer to
> > > > > > > > > > err
> > > > > > > > > > > > on the side of consistency and ease of implementation
> > for
> > > > now,
> > > > > > > but I
> > > > > > > > > > may
> > > > > > > > > > > be
> > > > > > > > > > > > missing a case where a few extra records from a task
> > > that's
> > > > > > slow
> > > > > > > to
> > > > > > > > > > shut
> > > > > > > > > > > > down may cause serious issues--let me know.
> > > > > > > > > > > >
> > > > > > > > > > > > 6. I'm hesitant to propose deprecation of the PAUSED
> > > state
> > > > > > right
> > > > > > > now
> > > > > > > > > as
> > > > > > > > > > > it
> > > > > > > > > > > > does serve a few purposes. Leaving tasks
> > idling-but-ready
> > > > makes
> > > > > > > > > > resuming
> > > > > > > > > > > > them less disruptive across the cluster, since a
> > > rebalance
> > > > > > isn't
> > > > > > > > > > > necessary.
> > > > > > > > > > > > It also reduces latency to resume the connector,
> > > > especially for
> > > > > > > ones
> > > > > > > > > > that
> > > > > > > > > > > > have to do a lot of state gathering on initialization
> > to,
> > > > e.g.,
> > > > > > > read
> > > > > > > > > > > > offsets from an external system.
> > > > > > > > > > > >
> > > > > > > > > > > > 7. There should be no risk of mixed tasks after a
> > > > downgrade,
> > > > > > > thanks
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > > empty set of task configs that gets published to the
> > > config
> > > > > > > topic.
> > > > > > > > > Both
> > > > > > > > > > > > upgraded and downgraded workers will render an empty
> > set
> > > of
> > > > > > > tasks for
> > > > > > > > > > the
> > > > > > > > > > > > connector, and keep that set of empty tasks until the
> > > > connector
> > > > > > > is
> > > > > > > > > > > resumed.
> > > > > > > > > > > > Does that address your concerns?
> > > > > > > > > > > >
> > > > > > > > > > > > You're also correct that the linked Jira ticket was
> > > wrong;
> > > > > > > thanks for
> > > > > > > > > > > > pointing that out! Yes, KAFKA-4107 is the intended
> > > ticket,
> > > > and
> > > > > > > I've
> > > > > > > > > > > updated
> > > > > > > > > > > > the link in the KIP accordingly.
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > >
> > > > > > > > > > > > Chris
> > > > > > > > > > > >
> > > > > > > > > > > > [1] -
> https://issues.apache.org/jira/browse/KAFKA-4107
> > > > > > > > > > > >
> > > > > > > > > > > > On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya <
> > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks a lot for this KIP, I think something like
> > this
> > > > has
> > > > > > been
> > > > > > > > > long
> > > > > > > > > > > > > overdue for Kafka Connect :)
> > > > > > > > > > > > >
> > > > > > > > > > > > > Some thoughts and questions that I had -
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1. I'm wondering if you could elaborate a little
> more
> > > on
> > > > the
> > > > > > > use
> > > > > > > > > case
> > > > > > > > > > > for
> > > > > > > > > > > > > the `DELETE /connectors/{connector}/offsets` API. I
> > > > think we
> > > > > > > can
> > > > > > > > > all
> > > > > > > > > > > > agree
> > > > > > > > > > > > > that a fine grained reset API that allows setting
> > > > arbitrary
> > > > > > > offsets
> > > > > > > > > > for
> > > > > > > > > > > > > partitions would be quite useful (which you talk
> > about
> > > > in the
> > > > > > > > > Future
> > > > > > > > > > > work
> > > > > > > > > > > > > section). But for the `DELETE
> > > > > > /connectors/{connector}/offsets`
> > > > > > > API
> > > > > > > > > in
> > > > > > > > > > > its
> > > > > > > > > > > > > described form, it looks like it would only serve a
> > > > seemingly
> > > > > > > niche
> > > > > > > > > > use
> > > > > > > > > > > > > case where users want to avoid renaming connectors
> -
> > > > because
> > > > > > > this
> > > > > > > > > new
> > > > > > > > > > > way
> > > > > > > > > > > > > of resetting offsets actually has more steps (i.e.
> > stop
> > > > the
> > > > > > > > > > connector,
> > > > > > > > > > > > > reset offsets via the API, resume the connector)
> than
> > > > simply
> > > > > > > > > deleting
> > > > > > > > > > > and
> > > > > > > > > > > > > re-creating the connector with a different name?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 2. The KIP talks about taking care that the
> response
> > > > formats
> > > > > > > > > > > (presumably
> > > > > > > > > > > > > only talking about the new GET API here) are
> > > symmetrical
> > > > for
> > > > > > > both
> > > > > > > > > > > source
> > > > > > > > > > > > > and sink connectors - is the end goal to have users
> > of
> > > > Kafka
> > > > > > > > > Connect
> > > > > > > > > > > not
> > > > > > > > > > > > > even be aware that sink connectors use Kafka
> > consumers
> > > > under
> > > > > > > the
> > > > > > > > > hood
> > > > > > > > > > > > (i.e.
> > > > > > > > > > > > > have that as purely an implementation detail
> > abstracted
> > > > away
> > > > > > > from
> > > > > > > > > > > users)?
> > > > > > > > > > > > > While I understand the value of uniformity here,
> the
> > > > response
> > > > > > > > > format
> > > > > > > > > > > for
> > > > > > > > > > > > > sink connectors currently looks a little odd with
> the
> > > > > > > "partition"
> > > > > > > > > > field
> > > > > > > > > > > > > having "topic" and "partition" as sub-fields,
> > > especially
> > > > to
> > > > > > > users
> > > > > > > > > > > > familiar
> > > > > > > > > > > > > with Kafka semantics. Thoughts?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 3. Another little nitpick on the response format -
> > why
> > > > do we
> > > > > > > need
> > > > > > > > > > > > "source"
> > > > > > > > > > > > > / "sink" as a field under "offsets"? Users can
> query
> > > the
> > > > > > > connector
> > > > > > > > > > type
> > > > > > > > > > > > via
> > > > > > > > > > > > > the existing `GET /connectors` API. If it's deemed
> > > > important
> > > > > > > to let
> > > > > > > > > > > users
> > > > > > > > > > > > > know that the offsets they're seeing correspond to
> a
> > > > source /
> > > > > > > sink
> > > > > > > > > > > > > connector, maybe we could have a top level field
> > "type"
> > > > in
> > > > > > the
> > > > > > > > > > response
> > > > > > > > > > > > for
> > > > > > > > > > > > > the `GET /connectors/{connector}/offsets` API
> similar
> > > to
> > > > the
> > > > > > > `GET
> > > > > > > > > > > > > /connectors` API?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 4. For the `DELETE /connectors/{connector}/offsets`
> > > API,
> > > > the
> > > > > > > KIP
> > > > > > > > > > > mentions
> > > > > > > > > > > > > that requests will be rejected if a rebalance is
> > > pending
> > > > -
> > > > > > > > > presumably
> > > > > > > > > > > > this
> > > > > > > > > > > > > is to avoid forwarding requests to a leader which
> may
> > > no
> > > > > > > longer be
> > > > > > > > > > the
> > > > > > > > > > > > > leader after the pending rebalance? In this case,
> the
> > > API
> > > > > > will
> > > > > > > > > > return a
> > > > > > > > > > > > > `409 Conflict` response similar to some of the
> > existing
> > > > APIs,
> > > > > > > > > right?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 5. Regarding fencing out previously running tasks
> > for a
> > > > > > > connector,
> > > > > > > > > do
> > > > > > > > > > > you
> > > > > > > > > > > > > think it would make more sense semantically to have
> > > this
> > > > > > > > > implemented
> > > > > > > > > > in
> > > > > > > > > > > > the
> > > > > > > > > > > > > stop endpoint where an empty set of tasks is
> > generated,
> > > > > > rather
> > > > > > > than
> > > > > > > > > > the
> > > > > > > > > > > > > delete offsets endpoint? This would also give the
> new
> > > > > > `STOPPED`
> > > > > > > > > > state a
> > > > > > > > > > > > > higher confidence of sorts, with any zombie tasks
> > being
> > > > > > fenced
> > > > > > > off
> > > > > > > > > > from
> > > > > > > > > > > > > continuing to produce data.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 6. Thanks for outlining the issues with the current
> > > > state of
> > > > > > > the
> > > > > > > > > > > `PAUSED`
> > > > > > > > > > > > > state - I think a lot of users expect it to behave
> > like
> > > > the
> > > > > > > > > `STOPPED`
> > > > > > > > > > > > state
> > > > > > > > > > > > > you outline in the KIP and are (unpleasantly)
> > surprised
> > > > when
> > > > > > it
> > > > > > > > > > > doesn't.
> > > > > > > > > > > > > However, this does beg the question of what the
> > > > usefulness of
> > > > > > > > > having
> > > > > > > > > > > two
> > > > > > > > > > > > > separate `PAUSED` and `STOPPED` states is? Do we
> want
> > > to
> > > > > > > continue
> > > > > > > > > > > > > supporting both these states in the future, or do
> you
> > > > see the
> > > > > > > > > > `STOPPED`
> > > > > > > > > > > > > state eventually causing the existing `PAUSED`
> state
> > to
> > > > be
> > > > > > > > > > deprecated?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 7. I think the idea outlined in the KIP for
> handling
> > a
> > > > new
> > > > > > > state
> > > > > > > > > > during
> > > > > > > > > > > > > cluster downgrades / rolling upgrades is quite
> > clever,
> > > > but do
> > > > > > > you
> > > > > > > > > > think
> > > > > > > > > > > > > there could be any issues with having a mix of
> > "paused"
> > > > and
> > > > > > > > > "stopped"
> > > > > > > > > > > > tasks
> > > > > > > > > > > > > for the same connector across workers in a cluster?
> > At
> > > > the
> > > > > > very
> > > > > > > > > > least,
> > > > > > > > > > > I
> > > > > > > > > > > > > think it would be fairly confusing to most users.
> I'm
> > > > > > > wondering if
> > > > > > > > > > this
> > > > > > > > > > > > can
> > > > > > > > > > > > > be avoided by stating clearly in the KIP that the
> new
> > > > `PUT
> > > > > > > > > > > > > /connectors/{connector}/stop`
> > > > > > > > > > > > > can only be used on a cluster that is fully
> upgraded
> > to
> > > > an AK
> > > > > > > > > version
> > > > > > > > > > > > newer
> > > > > > > > > > > > > than the one which ends up containing changes from
> > this
> > > > KIP
> > > > > > and
> > > > > > > > > that
> > > > > > > > > > > if a
> > > > > > > > > > > > > cluster needs to be downgraded to an older version,
> > the
> > > > user
> > > > > > > should
> > > > > > > > > > > > ensure
> > > > > > > > > > > > > that none of the connectors on the cluster are in a
> > > > stopped
> > > > > > > state?
> > > > > > > > > > With
> > > > > > > > > > > > the
> > > > > > > > > > > > > existing implementation, it looks like an
> > > unknown/invalid
> > > > > > > target
> > > > > > > > > > state
> > > > > > > > > > > > > record is basically just discarded (with an error
> > > message
> > > > > > > logged),
> > > > > > > > > so
> > > > > > > > > > > it
> > > > > > > > > > > > > doesn't seem to be a disastrous failure scenario
> that
> > > can
> > > > > > bring
> > > > > > > > > down
> > > > > > > > > > a
> > > > > > > > > > > > > worker.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Yash
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton
> > > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Ashwin,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for your thoughts. Regarding your
> questions:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 1. The response would show the offsets that are
> > > > visible to
> > > > > > > the
> > > > > > > > > > source
> > > > > > > > > > > > > > connector, so it would combine the contents of
> the
> > > two
> > > > > > > topics,
> > > > > > > > > > giving
> > > > > > > > > > > > > > priority to offsets present in the
> > connector-specific
> > > > > > topic.
> > > > > > > I'm
> > > > > > > > > > > > > imagining
> > > > > > > > > > > > > > a follow-up question that some people may have in
> > > > response
> > > > > > to
> > > > > > > > > that
> > > > > > > > > > is
> > > > > > > > > > > > > > whether we'd want to provide insight into the
> > > contents
> > > > of a
> > > > > > > > > single
> > > > > > > > > > > > topic
> > > > > > > > > > > > > at
> > > > > > > > > > > > > > a time. It may be useful to be able to see this
> > > > information
> > > > > > > in
> > > > > > > > > > order
> > > > > > > > > > > to
> > > > > > > > > > > > > > debug connector issues or verify that it's safe
> to
> > > stop
> > > > > > > using a
> > > > > > > > > > > > > > connector-specific offsets topic (either
> > explicitly,
> > > or
> > > > > > > > > implicitly
> > > > > > > > > > > via
> > > > > > > > > > > > > > cluster downgrade). What do you think about
> adding
> > a
> > > > URL
> > > > > > > query
> > > > > > > > > > > > parameter
> > > > > > > > > > > > > > that allows users to dictate which view of the
> > > > connector's
> > > > > > > > > offsets
> > > > > > > > > > > they
> > > > > > > > > > > > > are
> > > > > > > > > > > > > > given in the REST response, with options for the
> > > > worker's
> > > > > > > global
> > > > > > > > > > > topic,
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > connector-specific topic, and the combined view
> of
> > > them
> > > > > > that
> > > > > > > the
> > > > > > > > > > > > > connector
> > > > > > > > > > > > > > and its tasks see (which would be the default)?
> > This
> > > > may be
> > > > > > > too
> > > > > > > > > > much
> > > > > > > > > > > > for
> > > > > > > > > > > > > V1
> > > > > > > > > > > > > > but it feels like it's at least worth exploring a
> > > bit.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 2. There is no option for this at the moment.
> Reset
> > > > > > > semantics are
> > > > > > > > > > > > > extremely
> > > > > > > > > > > > > > coarse-grained; for source connectors, we delete
> > all
> > > > source
> > > > > > > > > > offsets,
> > > > > > > > > > > > and
> > > > > > > > > > > > > > for sink connectors, we delete the entire
> consumer
> > > > group.
> > > > > > I'm
> > > > > > > > > > hoping
> > > > > > > > > > > > this
> > > > > > > > > > > > > > will be enough for V1 and that, if there's
> > sufficient
> > > > > > demand
> > > > > > > for
> > > > > > > > > > it,
> > > > > > > > > > > we
> > > > > > > > > > > > > can
> > > > > > > > > > > > > > introduce a richer API for resetting or even
> > > modifying
> > > > > > > connector
> > > > > > > > > > > > offsets
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > a follow-up KIP.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 3. Good eye :) I think it's fine to keep the
> > existing
> > > > > > > behavior
> > > > > > > > > for
> > > > > > > > > > > the
> > > > > > > > > > > > > > PAUSED state with the Connector instance, since
> the
> > > > primary
> > > > > > > > > purpose
> > > > > > > > > > > of
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > Connector is to generate task configs and monitor
> > the
> > > > > > > external
> > > > > > > > > > system
> > > > > > > > > > > > for
> > > > > > > > > > > > > > changes. If there's no chance for tasks to be
> > running
> > > > > > > anyways, I
> > > > > > > > > > > don't
> > > > > > > > > > > > > see
> > > > > > > > > > > > > > much value in allowing paused connectors to
> > generate
> > > > new
> > > > > > task
> > > > > > > > > > > configs,
> > > > > > > > > > > > > > especially since each time that happens a
> rebalance
> > > is
> > > > > > > triggered
> > > > > > > > > > and
> > > > > > > > > > > > > > there's a non-zero cost to that. What do you
> think?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin
> > > > > > > > > > <apan...@confluent.io.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for KIP Chris - I think this is a useful
> > > > feature.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Can you please elaborate on the following in
> the
> > > KIP
> > > > -
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1. How would the response of GET
> > > > > > > > > /connectors/{connector}/offsets
> > > > > > > > > > > look
> > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > if the worker has both global and connector
> > > specific
> > > > > > > offsets
> > > > > > > > > > topic
> > > > > > > > > > > ?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 2. How can we pass the reset options like
> > shift-by
> > > ,
> > > > > > > > > to-date-time
> > > > > > > > > > > > etc.
> > > > > > > > > > > > > > > using a REST API like DELETE
> > > > > > > /connectors/{connector}/offsets ?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 3. Today PAUSE operation on a connector invokes
> > its
> > > > stop
> > > > > > > > > method -
> > > > > > > > > > > > will
> > > > > > > > > > > > > > > there be a change here to reduce confusion with
> > the
> > > > new
> > > > > > > > > proposed
> > > > > > > > > > > > > STOPPED
> > > > > > > > > > > > > > > state ?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Ashwin
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton
> > > > > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I noticed a fairly large gap in the first
> > version
> > > > of
> > > > > > > this KIP
> > > > > > > > > > > that
> > > > > > > > > > > > I
> > > > > > > > > > > > > > > > published last Friday, which has to do with
> > > > > > accommodating
> > > > > > > > > > > > connectors
> > > > > > > > > > > > > > > > that target different Kafka clusters than the
> > one
> > > > that
> > > > > > > the
> > > > > > > > > > Kafka
> > > > > > > > > > > > > > Connect
> > > > > > > > > > > > > > > > cluster uses for its internal topics and
> source
> > > > > > > connectors
> > > > > > > > > with
> > > > > > > > > > > > > > dedicated
> > > > > > > > > > > > > > > > offsets topics. I've since updated the KIP to
> > > > address
> > > > > > > this
> > > > > > > > > gap,
> > > > > > > > > > > > which
> > > > > > > > > > > > > > has
> > > > > > > > > > > > > > > > substantially altered the design. Wanted to
> > give
> > > a
> > > > > > > heads-up
> > > > > > > > > to
> > > > > > > > > > > > anyone
> > > > > > > > > > > > > > > > that's already started reviewing.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton
> <
> > > > > > > > > chr...@aiven.io>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I'd like to begin discussion on a KIP to
> add
> > > > offsets
> > > > > > > > > support
> > > > > > > > > > to
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > > > > Connect REST API:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Nov 4, 2022 at 10:27 PM Chris Egerton
> > > > > > <chr...@aiven.io.invalid
> > > > > > > >
> > > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Yash,
> > > > > > > > > >
> > > > > > > > > > Thanks again for your thoughts! Responses to ongoing
> > > > discussions
> > > > > > > inline
> > > > > > > > > > (easier to track context than referencing comment
> numbers):
> > > > > > > > > >
> > > > > > > > > > > However, this then leads me to wonder if we can make
> that
> > > > > > explicit
> > > > > > > by
> > > > > > > > > > including "connect" or "connector" in the higher level
> > field
> > > > names?
> > > > > > > Or do
> > > > > > > > > > you think this isn't required given that we're talking
> > about
> > > a
> > > > > > > Connect
> > > > > > > > > > specific REST API in the first place?
> > > > > > > > > >
> > > > > > > > > > I think "partition" and "offset" are fine as field names
> > but
> > > > I'm
> > > > > > not
> > > > > > > > > hugely
> > > > > > > > > > opposed to adding "connector " as a prefix to them; would
> > be
> > > > > > > interested
> > > > > > > > > in
> > > > > > > > > > others' thoughts.
> > > > > > > > > >
> > > > > > > > > > > I'm not sure I followed why the unresolved writes to
> the
> > > > config
> > > > > > > topic
> > > > > > > > > > would be an issue - wouldn't the delete offsets request
> be
> > > > added to
> > > > > > > the
> > > > > > > > > > herder's request queue and whenever it is processed, we'd
> > > > anyway
> > > > > > > need to
> > > > > > > > > > check if all the prerequisites for the request are
> > satisfied?
> > > > > > > > > >
> > > > > > > > > > Some requests are handled in multiple steps. For example,
> > > > deleting
> > > > > > a
> > > > > > > > > > connector (1) adds a request to the herder queue to
> write a
> > > > > > > tombstone to
> > > > > > > > > > the config topic (or, if the worker isn't the leader,
> > forward
> > > > the
> > > > > > > request
> > > > > > > > > > to the leader). (2) Once that tombstone is picked up,
> (3) a
> > > > > > rebalance
> > > > > > > > > > ensues, and then after it's finally complete, (4) the
> > > > connector and
> > > > > > > its
> > > > > > > > > > tasks are shut down. I probably could have used better
> > > > terminology,
> > > > > > > but
> > > > > > > > > > what I meant by "unresolved writes to the config topic"
> > was a
> > > > case
> > > > > > in
> > > > > > > > > > between steps (2) and (3)--where the worker has already
> > read
> > > > that
> > > > > > > > > tombstone
> > > > > > > > > > from the config topic and knows that a rebalance is
> > pending,
> > > > but
> > > > > > > hasn't
> > > > > > > > > > begun participating in that rebalance yet. In the
> > > > DistributedHerder
> > > > > > > > > class,
> > > > > > > > > > this is done via the `checkRebalanceNeeded` method.
> > > > > > > > > >
> > > > > > > > > > > We can probably revisit this potential deprecation [of
> > the
> > > > PAUSED
> > > > > > > > > state]
> > > > > > > > > > in the future based on user feedback and how the adoption
> > of
> > > > the
> > > > > > new
> > > > > > > > > > proposed stop endpoint looks like, what do you think?
> > > > > > > > > >
> > > > > > > > > > Yeah, revisiting in the future seems reasonable. 👍
> > > > > > > > > >
> > > > > > > > > > And responses to new comments here:
> > > > > > > > > >
> > > > > > > > > > 8. Yep, we'll start tracking offsets by connector. I
> don't
> > > > believe
> > > > > > > this
> > > > > > > > > > should be too difficult, and suspect that the only reason
> > we
> > > > track
> > > > > > > raw
> > > > > > > > > byte
> > > > > > > > > > arrays instead of pre-deserializing offset topic
> > information
> > > > into
> > > > > > > > > something
> > > > > > > > > > more useful is because Connect originally had pluggable
> > > > internal
> > > > > > > > > > converters. Now that we're hardcoded to use the JSON
> > > converter
> > > > it
> > > > > > > should
> > > > > > > > > be
> > > > > > > > > > fine to track offsets on a per-connector basis as they're
> > > read
> > > > from
> > > > > > > the
> > > > > > > > > > offsets topic.
> > > > > > > > > >
> > > > > > > > > > 9. I'm hesitant to introduce this type of feature right
> now
> > > > because
> > > > > > > of
> > > > > > > > > all
> > > > > > > > > > of the gotchas that would come with it. In
> > security-conscious
> > > > > > > > > environments,
> > > > > > > > > > it's possible that a sink connector's principal may have
> > > > access to
> > > > > > > the
> > > > > > > > > > consumer group used by the connector, but the worker's
> > > > principal
> > > > > > may
> > > > > > > not.
> > > > > > > > > > There's also the case where source connectors have
> separate
> > > > offsets
> > > > > > > > > topics,
> > > > > > > > > > or sink connectors have overridden consumer group IDs, or
> > > sink
> > > > or
> > > > > > > source
> > > > > > > > > > connectors work against a different Kafka cluster than
> the
> > > one
> > > > that
> > > > > > > their
> > > > > > > > > > worker uses. Overall, I'd rather provide a single API
> that
> > > > works in
> > > > > > > all
> > > > > > > > > > cases rather than risk confusing and alienating users by
> > > > trying to
> > > > > > > make
> > > > > > > > > > their lives easier in a subset of cases.
> > > > > > > > > >
> > > > > > > > > > 10. Hmm... I don't think the order of the writes matters
> > too
> > > > much
> > > > > > > here,
> > > > > > > > > but
> > > > > > > > > > we probably could start by deleting from the global topic
> > > > first,
> > > > > > > that's
> > > > > > > > > > true. The reason I'm not hugely concerned about this case
> > is
> > > > that
> > > > > > if
> > > > > > > > > > something goes wrong while resetting offsets, there's no
> > > > immediate
> > > > > > > > > > impact--the connector will still be in the STOPPED state.
> > The
> > > > REST
> > > > > > > > > response
> > > > > > > > > > for requests to reset the offsets will clearly call out
> > that
> > > > the
> > > > > > > > > operation
> > > > > > > > > > has failed, and if necessary, we can probably also add a
> > > > > > > scary-looking
> > > > > > > > > > warning message stating that we can't guarantee which
> > offsets
> > > > have
> > > > > > > been
> > > > > > > > > > successfully wiped and which haven't. Users can query the
> > > exact
> > > > > > > offsets
> > > > > > > > > of
> > > > > > > > > > the connector at this point to determine what will happen
> > > > if/what
> > > > > > > they
> > > > > > > > > > resume it. And they can repeat attempts to reset the
> > offsets
> > > as
> > > > > > many
> > > > > > > > > times
> > > > > > > > > > as they'd like until they get back a 2XX response,
> > indicating
> > > > that
> > > > > > > it's
> > > > > > > > > > finally safe to resume the connector. Thoughts?
> > > > > > > > > >
> > > > > > > > > > 11. I haven't thought too much about it. I think
> something
> > > > like the
> > > > > > > > > > Monitorable* connectors would probably serve our needs
> > here;
> > > > we can
> > > > > > > > > > instantiate them on a running Connect cluster and then
> use
> > > > various
> > > > > > > > > handles
> > > > > > > > > > to know how many times they've been polled, committed
> > > records,
> > > > etc.
> > > > > > > If
> > > > > > > > > > necessary we can tweak those classes or even write our
> own.
> > > But
> > > > > > > anyways,
> > > > > > > > > > once that's all done, the test will be something like
> > > "create a
> > > > > > > > > connector,
> > > > > > > > > > wait for it to produce N records (each of which contains
> > some
> > > > kind
> > > > > > of
> > > > > > > > > > predictable offset), and ensure that the offsets for it
> in
> > > the
> > > > REST
> > > > > > > API
> > > > > > > > > > match up with the ones we'd expect from N records". Does
> > that
> > > > > > answer
> > > > > > > your
> > > > > > > > > > question?
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > >
> > > > > > > > > > Chris
> > > > > > > > > >
> > > > > > > > > > On Tue, Oct 18, 2022 at 3:28 AM Yash Mayya <
> > > > yash.ma...@gmail.com>
> > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Chris,
> > > > > > > > > > >
> > > > > > > > > > > 1. Thanks a lot for elaborating on this, I'm now
> > convinced
> > > > about
> > > > > > > the
> > > > > > > > > > > usefulness of the new offset reset endpoint. Regarding
> > the
> > > > > > > follow-up
> > > > > > > > > KIP
> > > > > > > > > > > for a fine-grained offset write API, I'd be happy to
> take
> > > > that on
> > > > > > > once
> > > > > > > > > > this
> > > > > > > > > > > KIP is finalized and I will definitely look forward to
> > your
> > > > > > > feedback on
> > > > > > > > > > > that one!
> > > > > > > > > > >
> > > > > > > > > > > 2. Gotcha, the motivation makes more sense to me now.
> So
> > > the
> > > > > > higher
> > > > > > > > > level
> > > > > > > > > > > partition field represents a Connect specific "logical
> > > > partition"
> > > > > > > of
> > > > > > > > > > sorts
> > > > > > > > > > > - i.e. the source partition as defined by a connector
> for
> > > > source
> > > > > > > > > > connectors
> > > > > > > > > > > and a Kafka topic + partition for sink connectors. I
> like
> > > the
> > > > > > idea
> > > > > > > of
> > > > > > > > > > > adding a Kafka prefix to the lower level
> partition/offset
> > > > (and
> > > > > > > topic)
> > > > > > > > > > > fields which basically makes it more clear (although
> > > > implicitly)
> > > > > > > that
> > > > > > > > > the
> > > > > > > > > > > higher level partition/offset field is Connect specific
> > and
> > > > not
> > > > > > the
> > > > > > > > > same
> > > > > > > > > > as
> > > > > > > > > > > what those terms represent in Kafka itself. However,
> this
> > > > then
> > > > > > > leads me
> > > > > > > > > > to
> > > > > > > > > > > wonder if we can make that explicit by including
> > "connect"
> > > or
> > > > > > > > > "connector"
> > > > > > > > > > > in the higher level field names? Or do you think this
> > isn't
> > > > > > > required
> > > > > > > > > > given
> > > > > > > > > > > that we're talking about a Connect specific REST API in
> > the
> > > > first
> > > > > > > > > place?
> > > > > > > > > > >
> > > > > > > > > > > 3. Thanks, I think the response structure definitely
> > looks
> > > > better
> > > > > > > now!
> > > > > > > > > > >
> > > > > > > > > > > 4. Interesting, I'd be curious to learn why we might
> want
> > > to
> > > > > > change
> > > > > > > > > this
> > > > > > > > > > in
> > > > > > > > > > > the future but that's probably out of scope for this
> > > > discussion.
> > > > > > > I'm
> > > > > > > > > not
> > > > > > > > > > > sure I followed why the unresolved writes to the config
> > > topic
> > > > > > > would be
> > > > > > > > > an
> > > > > > > > > > > issue - wouldn't the delete offsets request be added to
> > the
> > > > > > > herder's
> > > > > > > > > > > request queue and whenever it is processed, we'd anyway
> > > need
> > > > to
> > > > > > > check
> > > > > > > > > if
> > > > > > > > > > > all the prerequisites for the request are satisfied?
> > > > > > > > > > >
> > > > > > > > > > > 5. Thanks for elaborating that just fencing out the
> > > producer
> > > > > > still
> > > > > > > > > leaves
> > > > > > > > > > > many cases where source tasks remain hanging around and
> > > also
> > > > that
> > > > > > > we
> > > > > > > > > > anyway
> > > > > > > > > > > can't have similar data production guarantees for sink
> > > > connectors
> > > > > > > right
> > > > > > > > > > > now. I agree that it might be better to go with ease of
> > > > > > > implementation
> > > > > > > > > > and
> > > > > > > > > > > consistency for now.
> > > > > > > > > > >
> > > > > > > > > > > 6. Right, that does make sense but I still feel like
> the
> > > two
> > > > > > states
> > > > > > > > > will
> > > > > > > > > > > end up being confusing to end users who might not be
> able
> > > to
> > > > > > > discern
> > > > > > > > > the
> > > > > > > > > > > (fairly low-level) differences between them (also the
> > > > nuances of
> > > > > > > state
> > > > > > > > > > > transitions like STOPPED -> PAUSED or PAUSED -> STOPPED
> > > with
> > > > the
> > > > > > > > > > > rebalancing implications as well). We can probably
> > revisit
> > > > this
> > > > > > > > > potential
> > > > > > > > > > > deprecation in the future based on user feedback and
> how
> > > the
> > > > > > > adoption
> > > > > > > > > of
> > > > > > > > > > > the new proposed stop endpoint looks like, what do you
> > > think?
> > > > > > > > > > >
> > > > > > > > > > > 7. Aha, that is completely my bad, I missed that the
> > v1/v2
> > > > state
> > > > > > is
> > > > > > > > > only
> > > > > > > > > > > applicable to the connector's target state and that we
> > > don't
> > > > need
> > > > > > > to
> > > > > > > > > > worry
> > > > > > > > > > > about the tasks since we will have an empty set of
> > tasks. I
> > > > > > think I
> > > > > > > > > was a
> > > > > > > > > > > little confused by "pause the parts of the connector
> that
> > > > they
> > > > > > are
> > > > > > > > > > > assigned" from the KIP. Thanks for clarifying that!
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Some more thoughts and questions that I had -
> > > > > > > > > > >
> > > > > > > > > > > 8. Could you elaborate on what the implementation for
> > > offset
> > > > > > reset
> > > > > > > for
> > > > > > > > > > > source connectors would look like? Currently, it
> doesn't
> > > look
> > > > > > like
> > > > > > > we
> > > > > > > > > > track
> > > > > > > > > > > all the partitions for a source connector anywhere.
> Will
> > we
> > > > need
> > > > > > to
> > > > > > > > > > > book-keep this somewhere in order to be able to emit a
> > > > tombstone
> > > > > > > record
> > > > > > > > > > for
> > > > > > > > > > > each source partition?
> > > > > > > > > > >
> > > > > > > > > > > 9. The KIP describes the offset reset endpoint as only
> > > being
> > > > > > > usable on
> > > > > > > > > > > existing connectors that are in a `STOPPED` state. Why
> > > > wouldn't
> > > > > > we
> > > > > > > want
> > > > > > > > > > to
> > > > > > > > > > > allow resetting offsets for a deleted connector which
> > seems
> > > > to
> > > > > > be a
> > > > > > > > > valid
> > > > > > > > > > > use case? Or do we plan to handle this use case only
> via
> > > the
> > > > item
> > > > > > > > > > outlined
> > > > > > > > > > > in the future work section - "Automatically delete
> > offsets
> > > > with
> > > > > > > > > > > connectors"?
> > > > > > > > > > >
> > > > > > > > > > > 10. The KIP mentions that source offsets will be reset
> > > > > > > transactionally
> > > > > > > > > > for
> > > > > > > > > > > each topic (worker global offset topic and connector
> > > specific
> > > > > > > offset
> > > > > > > > > > topic
> > > > > > > > > > > if it exists). While it obviously isn't possible to
> > > > atomically do
> > > > > > > the
> > > > > > > > > > > writes to two topics which may be on different Kafka
> > > > clusters,
> > > > > > I'm
> > > > > > > > > > > wondering about what would happen if the first
> > transaction
> > > > > > > succeeds but
> > > > > > > > > > the
> > > > > > > > > > > second one fails. I think the order of the two
> > transactions
> > > > > > matters
> > > > > > > > > here
> > > > > > > > > > -
> > > > > > > > > > > if we successfully emit tombstones to the connector
> > > specific
> > > > > > offset
> > > > > > > > > topic
> > > > > > > > > > > and fail to do so for the worker global offset topic,
> > we'll
> > > > > > > presumably
> > > > > > > > > > fail
> > > > > > > > > > > the offset delete request because the KIP mentions that
> > "A
> > > > > > request
> > > > > > > to
> > > > > > > > > > reset
> > > > > > > > > > > offsets for a source connector will only be considered
> > > > successful
> > > > > > > if
> > > > > > > > > the
> > > > > > > > > > > worker is able to delete all known offsets for that
> > > > connector, on
> > > > > > > both
> > > > > > > > > > the
> > > > > > > > > > > worker's global offsets topic and (if one is used) the
> > > > > > connector's
> > > > > > > > > > > dedicated offsets topic.". However, this will lead to
> the
> > > > > > connector
> > > > > > > > > only
> > > > > > > > > > > being able to read potentially older offsets from the
> > > worker
> > > > > > global
> > > > > > > > > > offset
> > > > > > > > > > > topic on resumption (based on the combined offset view
> > > > presented
> > > > > > as
> > > > > > > > > > > described in KIP-618 [1]). So, I think we should make
> > sure
> > > > that
> > > > > > the
> > > > > > > > > > worker
> > > > > > > > > > > global offset topic tombstoning is attempted first,
> > right?
> > > > Note
> > > > > > > that in
> > > > > > > > > > the
> > > > > > > > > > > current implementation of
> > > > `ConnectorOffsetBackingStore::set`, the
> > > > > > > > > > primary /
> > > > > > > > > > > connector specific offset store is written to first.
> > > > > > > > > > >
> > > > > > > > > > > 11. This probably isn't necessary to elaborate on in
> the
> > > KIP
> > > > > > > itself,
> > > > > > > > > but
> > > > > > > > > > I
> > > > > > > > > > > was wondering what the second offset test - "verify
> that
> > > that
> > > > > > those
> > > > > > > > > > offsets
> > > > > > > > > > > reflect an expected level of progress for each
> connector
> > > > (i.e.,
> > > > > > > they
> > > > > > > > > are
> > > > > > > > > > > greater than or equal to a certain value depending on
> how
> > > the
> > > > > > > > > connectors
> > > > > > > > > > > are configured and how long they have been running)" -
> > > would
> > > > look
> > > > > > > like?
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Yash
> > > > > > > > > > >
> > > > > > > > > > > [1] -
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration
> > > > > > > > > > >
> > > > > > > > > > > On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton
> > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for your detailed thoughts.
> > > > > > > > > > > >
> > > > > > > > > > > > 1. In KAFKA-4107 [1], the primary request is exactly
> > > what's
> > > > > > > proposed
> > > > > > > > > in
> > > > > > > > > > > the
> > > > > > > > > > > > KIP right now: a way to reset offsets for connectors.
> > > Sure,
> > > > > > > there's
> > > > > > > > > an
> > > > > > > > > > > > extra step of stopping the connector, but renaming a
> > > > connector
> > > > > > > isn't
> > > > > > > > > as
> > > > > > > > > > > > convenient of an alternative as it may seem since in
> > many
> > > > cases
> > > > > > > you'd
> > > > > > > > > > > also
> > > > > > > > > > > > want to delete the older one, so the complete
> sequence
> > of
> > > > steps
> > > > > > > would
> > > > > > > > > > be
> > > > > > > > > > > > something like delete the old connector, rename it
> > > > (possibly
> > > > > > > > > requiring
> > > > > > > > > > > > modifications to its config file, depending on which
> > API
> > > is
> > > > > > > used),
> > > > > > > > > then
> > > > > > > > > > > > create the renamed variant. It's also just not a
> great
> > > user
> > > > > > > > > > > > experience--even if the practical impacts are limited
> > > > (which,
> > > > > > > IMO,
> > > > > > > > > they
> > > > > > > > > > > are
> > > > > > > > > > > > not), people have been asking for years about why
> they
> > > > have to
> > > > > > > employ
> > > > > > > > > > > this
> > > > > > > > > > > > kind of a workaround for a fairly common use case,
> and
> > we
> > > > don't
> > > > > > > > > really
> > > > > > > > > > > have
> > > > > > > > > > > > a good answer beyond "we haven't implemented
> something
> > > > better
> > > > > > > yet".
> > > > > > > > > On
> > > > > > > > > > > top
> > > > > > > > > > > > of that, you may have external tooling that needs to
> be
> > > > tweaked
> > > > > > > to
> > > > > > > > > > > handle a
> > > > > > > > > > > > new connector name, you may have strict authorization
> > > > policies
> > > > > > > around
> > > > > > > > > > who
> > > > > > > > > > > > can access what connectors, you may have other ACLs
> > > > attached to
> > > > > > > the
> > > > > > > > > > name
> > > > > > > > > > > of
> > > > > > > > > > > > the connector (which can be especially common in the
> > case
> > > > of
> > > > > > sink
> > > > > > > > > > > > connectors, whose consumer group IDs are tied to
> their
> > > > names by
> > > > > > > > > > default),
> > > > > > > > > > > > and leaving around state in the offsets topic that
> can
> > > > never be
> > > > > > > > > cleaned
> > > > > > > > > > > up
> > > > > > > > > > > > presents a bit of a footgun for users. It may not be
> a
> > > > silver
> > > > > > > bullet,
> > > > > > > > > > but
> > > > > > > > > > > > providing some mechanism to reset that state is a
> step
> > in
> > > > the
> > > > > > > right
> > > > > > > > > > > > direction and allows responsible users to more
> > carefully
> > > > > > > administer
> > > > > > > > > > their
> > > > > > > > > > > > cluster without resorting to non-public APIs. That
> > said,
> > > I
> > > > do
> > > > > > > agree
> > > > > > > > > > that
> > > > > > > > > > > a
> > > > > > > > > > > > fine-grained reset/overwrite API would be useful, and
> > I'd
> > > > be
> > > > > > > happy to
> > > > > > > > > > > > review a KIP to add that feature if anyone wants to
> > > tackle
> > > > it!
> > > > > > > > > > > >
> > > > > > > > > > > > 2. Keeping the two formats symmetrical is motivated
> > > mostly
> > > > by
> > > > > > > > > > aesthetics
> > > > > > > > > > > > and quality-of-life for programmatic interaction with
> > the
> > > > API;
> > > > > > > it's
> > > > > > > > > not
> > > > > > > > > > > > really a goal to hide the use of consumer groups from
> > > > users. I
> > > > > > do
> > > > > > > > > agree
> > > > > > > > > > > > that the format is a little strange-looking for sink
> > > > > > connectors,
> > > > > > > but
> > > > > > > > > it
> > > > > > > > > > > > seemed like it would be easier to work with for UIs,
> > > > casual jq
> > > > > > > > > queries,
> > > > > > > > > > > and
> > > > > > > > > > > > CLIs than a more Kafka-specific alternative such as
> > > > > > > > > > > {"<topic>-<partition>":
> > > > > > > > > > > > "<offset>"}, and although it is a little strange, I
> > don't
> > > > think
> > > > > > > it's
> > > > > > > > > > any
> > > > > > > > > > > > less readable or intuitive. That said, I've made some
> > > > tweaks to
> > > > > > > the
> > > > > > > > > > > format
> > > > > > > > > > > > that should make programmatic access even easier;
> > > > specifically,
> > > > > > > I've
> > > > > > > > > > > > removed the "source" and "sink" wrapper fields and
> > > instead
> > > > > > moved
> > > > > > > them
> > > > > > > > > > > into
> > > > > > > > > > > > a top-level object with a "type" and "offsets" field,
> > > just
> > > > like
> > > > > > > you
> > > > > > > > > > > > suggested in point 3 (thanks!). We might also
> consider
> > > > changing
> > > > > > > the
> > > > > > > > > > field
> > > > > > > > > > > > names for sink offsets from "topic", "partition", and
> > > > "offset"
> > > > > > to
> > > > > > > > > > "Kafka
> > > > > > > > > > > > topic", "Kafka partition", and "Kafka offset"
> > > > respectively, to
> > > > > > > reduce
> > > > > > > > > > the
> > > > > > > > > > > > stuttering effect of having a "partition" field
> inside
> > a
> > > > > > > "partition"
> > > > > > > > > > > field
> > > > > > > > > > > > and the same with an "offset" field; thoughts? One
> > final
> > > > > > > point--by
> > > > > > > > > > > equating
> > > > > > > > > > > > source and sink offsets, we probably make it easier
> for
> > > > users
> > > > > > to
> > > > > > > > > > > understand
> > > > > > > > > > > > exactly what a source offset is; anyone who's
> familiar
> > > with
> > > > > > > consumer
> > > > > > > > > > > > offsets can see from the response format that we
> > > identify a
> > > > > > > logical
> > > > > > > > > > > > partition as a combination of two entities (a topic
> > and a
> > > > > > > partition
> > > > > > > > > > > > number); it should make it easier to grok what a
> source
> > > > offset
> > > > > > > is by
> > > > > > > > > > > seeing
> > > > > > > > > > > > what the two formats have in common.
> > > > > > > > > > > >
> > > > > > > > > > > > 3. Great idea! Done.
> > > > > > > > > > > >
> > > > > > > > > > > > 4. Yes, I'm thinking right now that a 409 will be the
> > > > response
> > > > > > > status
> > > > > > > > > > if
> > > > > > > > > > > a
> > > > > > > > > > > > rebalance is pending. I'd rather not add this to the
> > KIP
> > > > as we
> > > > > > > may
> > > > > > > > > want
> > > > > > > > > > > to
> > > > > > > > > > > > change it at some point and it doesn't seem vital to
> > > > establish
> > > > > > > it as
> > > > > > > > > > part
> > > > > > > > > > > > of the public contract for the new endpoint right
> now.
> > > > Also,
> > > > > > > small
> > > > > > > > > > > > point--yes, a 409 is useful to avoid forwarding
> > requests
> > > > to an
> > > > > > > > > > incorrect
> > > > > > > > > > > > leader, but it's also useful to ensure that there
> > aren't
> > > > any
> > > > > > > > > unresolved
> > > > > > > > > > > > writes to the config topic that might cause issues
> with
> > > the
> > > > > > > request
> > > > > > > > > > (such
> > > > > > > > > > > > as deleting the connector).
> > > > > > > > > > > >
> > > > > > > > > > > > 5. That's a good point--it may be misleading to call
> a
> > > > > > connector
> > > > > > > > > > STOPPED
> > > > > > > > > > > > when it has zombie tasks lying around on the
> cluster. I
> > > > don't
> > > > > > > think
> > > > > > > > > > it'd
> > > > > > > > > > > be
> > > > > > > > > > > > appropriate to do this synchronously while handling
> > > > requests to
> > > > > > > the
> > > > > > > > > PUT
> > > > > > > > > > > > /connectors/{connector}/stop since we'd want to give
> > all
> > > > > > > > > > > currently-running
> > > > > > > > > > > > tasks a chance to gracefully shut down, though. I'm
> > also
> > > > not
> > > > > > sure
> > > > > > > > > that
> > > > > > > > > > > this
> > > > > > > > > > > > is a significant problem, either. If the connector is
> > > > resumed,
> > > > > > > then
> > > > > > > > > all
> > > > > > > > > > > > zombie tasks will be automatically fenced out by
> their
> > > > > > > successors on
> > > > > > > > > > > > startup; if it's deleted, then we'll have wasted
> effort
> > > by
> > > > > > > performing
> > > > > > > > > > an
> > > > > > > > > > > > unnecessary round of fencing. It may be nice to
> > guarantee
> > > > that
> > > > > > > source
> > > > > > > > > > > task
> > > > > > > > > > > > resources will be deallocated after the connector
> > > > transitions
> > > > > > to
> > > > > > > > > > STOPPED,
> > > > > > > > > > > > but realistically, it doesn't do much to just fence
> out
> > > > their
> > > > > > > > > > producers,
> > > > > > > > > > > > since tasks can be blocked on a number of other
> > > operations
> > > > such
> > > > > > > as
> > > > > > > > > > > > key/value/header conversion, transformation, and task
> > > > polling.
> > > > > > > It may
> > > > > > > > > > be
> > > > > > > > > > > a
> > > > > > > > > > > > little strange if data is produced to Kafka after the
> > > > connector
> > > > > > > has
> > > > > > > > > > > > transitioned to STOPPED, but we can't provide the
> same
> > > > > > > guarantees for
> > > > > > > > > > > sink
> > > > > > > > > > > > connectors, since their tasks may be stuck on a
> > > > long-running
> > > > > > > > > > > SinkTask::put
> > > > > > > > > > > > that emits data even after the Connect framework has
> > > > abandoned
> > > > > > > them
> > > > > > > > > > after
> > > > > > > > > > > > exhausting their graceful shutdown timeout.
> Ultimately,
> > > I'd
> > > > > > > prefer to
> > > > > > > > > > err
> > > > > > > > > > > > on the side of consistency and ease of implementation
> > for
> > > > now,
> > > > > > > but I
> > > > > > > > > > may
> > > > > > > > > > > be
> > > > > > > > > > > > missing a case where a few extra records from a task
> > > that's
> > > > > > slow
> > > > > > > to
> > > > > > > > > > shut
> > > > > > > > > > > > down may cause serious issues--let me know.
> > > > > > > > > > > >
> > > > > > > > > > > > 6. I'm hesitant to propose deprecation of the PAUSED
> > > state
> > > > > > right
> > > > > > > now
> > > > > > > > > as
> > > > > > > > > > > it
> > > > > > > > > > > > does serve a few purposes. Leaving tasks
> > idling-but-ready
> > > > makes
> > > > > > > > > > resuming
> > > > > > > > > > > > them less disruptive across the cluster, since a
> > > rebalance
> > > > > > isn't
> > > > > > > > > > > necessary.
> > > > > > > > > > > > It also reduces latency to resume the connector,
> > > > especially for
> > > > > > > ones
> > > > > > > > > > that
> > > > > > > > > > > > have to do a lot of state gathering on initialization
> > to,
> > > > e.g.,
> > > > > > > read
> > > > > > > > > > > > offsets from an external system.
> > > > > > > > > > > >
> > > > > > > > > > > > 7. There should be no risk of mixed tasks after a
> > > > downgrade,
> > > > > > > thanks
> > > > > > > > > to
> > > > > > > > > > > the
> > > > > > > > > > > > empty set of task configs that gets published to the
> > > config
> > > > > > > topic.
> > > > > > > > > Both
> > > > > > > > > > > > upgraded and downgraded workers will render an empty
> > set
> > > of
> > > > > > > tasks for
> > > > > > > > > > the
> > > > > > > > > > > > connector, and keep that set of empty tasks until the
> > > > connector
> > > > > > > is
> > > > > > > > > > > resumed.
> > > > > > > > > > > > Does that address your concerns?
> > > > > > > > > > > >
> > > > > > > > > > > > You're also correct that the linked Jira ticket was
> > > wrong;
> > > > > > > thanks for
> > > > > > > > > > > > pointing that out! Yes, KAFKA-4107 is the intended
> > > ticket,
> > > > and
> > > > > > > I've
> > > > > > > > > > > updated
> > > > > > > > > > > > the link in the KIP accordingly.
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > >
> > > > > > > > > > > > Chris
> > > > > > > > > > > >
> > > > > > > > > > > > [1] -
> https://issues.apache.org/jira/browse/KAFKA-4107
> > > > > > > > > > > >
> > > > > > > > > > > > On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya <
> > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks a lot for this KIP, I think something like
> > this
> > > > has
> > > > > > been
> > > > > > > > > long
> > > > > > > > > > > > > overdue for Kafka Connect :)
> > > > > > > > > > > > >
> > > > > > > > > > > > > Some thoughts and questions that I had -
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1. I'm wondering if you could elaborate a little
> more
> > > on
> > > > the
> > > > > > > use
> > > > > > > > > case
> > > > > > > > > > > for
> > > > > > > > > > > > > the `DELETE /connectors/{connector}/offsets` API. I
> > > > think we
> > > > > > > can
> > > > > > > > > all
> > > > > > > > > > > > agree
> > > > > > > > > > > > > that a fine grained reset API that allows setting
> > > > arbitrary
> > > > > > > offsets
> > > > > > > > > > for
> > > > > > > > > > > > > partitions would be quite useful (which you talk
> > about
> > > > in the
> > > > > > > > > Future
> > > > > > > > > > > work
> > > > > > > > > > > > > section). But for the `DELETE
> > > > > > /connectors/{connector}/offsets`
> > > > > > > API
> > > > > > > > > in
> > > > > > > > > > > its
> > > > > > > > > > > > > described form, it looks like it would only serve a
> > > > seemingly
> > > > > > > niche
> > > > > > > > > > use
> > > > > > > > > > > > > case where users want to avoid renaming connectors
> -
> > > > because
> > > > > > > this
> > > > > > > > > new
> > > > > > > > > > > way
> > > > > > > > > > > > > of resetting offsets actually has more steps (i.e.
> > stop
> > > > the
> > > > > > > > > > connector,
> > > > > > > > > > > > > reset offsets via the API, resume the connector)
> than
> > > > simply
> > > > > > > > > deleting
> > > > > > > > > > > and
> > > > > > > > > > > > > re-creating the connector with a different name?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 2. The KIP talks about taking care that the
> response
> > > > formats
> > > > > > > > > > > (presumably
> > > > > > > > > > > > > only talking about the new GET API here) are
> > > symmetrical
> > > > for
> > > > > > > both
> > > > > > > > > > > source
> > > > > > > > > > > > > and sink connectors - is the end goal to have users
> > of
> > > > Kafka
> > > > > > > > > Connect
> > > > > > > > > > > not
> > > > > > > > > > > > > even be aware that sink connectors use Kafka
> > consumers
> > > > under
> > > > > > > the
> > > > > > > > > hood
> > > > > > > > > > > > (i.e.
> > > > > > > > > > > > > have that as purely an implementation detail
> > abstracted
> > > > away
> > > > > > > from
> > > > > > > > > > > users)?
> > > > > > > > > > > > > While I understand the value of uniformity here,
> the
> > > > response
> > > > > > > > > format
> > > > > > > > > > > for
> > > > > > > > > > > > > sink connectors currently looks a little odd with
> the
> > > > > > > "partition"
> > > > > > > > > > field
> > > > > > > > > > > > > having "topic" and "partition" as sub-fields,
> > > especially
> > > > to
> > > > > > > users
> > > > > > > > > > > > familiar
> > > > > > > > > > > > > with Kafka semantics. Thoughts?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 3. Another little nitpick on the response format -
> > why
> > > > do we
> > > > > > > need
> > > > > > > > > > > > "source"
> > > > > > > > > > > > > / "sink" as a field under "offsets"? Users can
> query
> > > the
> > > > > > > connector
> > > > > > > > > > type
> > > > > > > > > > > > via
> > > > > > > > > > > > > the existing `GET /connectors` API. If it's deemed
> > > > important
> > > > > > > to let
> > > > > > > > > > > users
> > > > > > > > > > > > > know that the offsets they're seeing correspond to
> a
> > > > source /
> > > > > > > sink
> > > > > > > > > > > > > connector, maybe we could have a top level field
> > "type"
> > > > in
> > > > > > the
> > > > > > > > > > response
> > > > > > > > > > > > for
> > > > > > > > > > > > > the `GET /connectors/{connector}/offsets` API
> similar
> > > to
> > > > the
> > > > > > > `GET
> > > > > > > > > > > > > /connectors` API?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 4. For the `DELETE /connectors/{connector}/offsets`
> > > API,
> > > > the
> > > > > > > KIP
> > > > > > > > > > > mentions
> > > > > > > > > > > > > that requests will be rejected if a rebalance is
> > > pending
> > > > -
> > > > > > > > > presumably
> > > > > > > > > > > > this
> > > > > > > > > > > > > is to avoid forwarding requests to a leader which
> may
> > > no
> > > > > > > longer be
> > > > > > > > > > the
> > > > > > > > > > > > > leader after the pending rebalance? In this case,
> the
> > > API
> > > > > > will
> > > > > > > > > > return a
> > > > > > > > > > > > > `409 Conflict` response similar to some of the
> > existing
> > > > APIs,
> > > > > > > > > right?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 5. Regarding fencing out previously running tasks
> > for a
> > > > > > > connector,
> > > > > > > > > do
> > > > > > > > > > > you
> > > > > > > > > > > > > think it would make more sense semantically to have
> > > this
> > > > > > > > > implemented
> > > > > > > > > > in
> > > > > > > > > > > > the
> > > > > > > > > > > > > stop endpoint where an empty set of tasks is
> > generated,
> > > > > > rather
> > > > > > > than
> > > > > > > > > > the
> > > > > > > > > > > > > delete offsets endpoint? This would also give the
> new
> > > > > > `STOPPED`
> > > > > > > > > > state a
> > > > > > > > > > > > > higher confidence of sorts, with any zombie tasks
> > being
> > > > > > fenced
> > > > > > > off
> > > > > > > > > > from
> > > > > > > > > > > > > continuing to produce data.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 6. Thanks for outlining the issues with the current
> > > > state of
> > > > > > > the
> > > > > > > > > > > `PAUSED`
> > > > > > > > > > > > > state - I think a lot of users expect it to behave
> > like
> > > > the
> > > > > > > > > `STOPPED`
> > > > > > > > > > > > state
> > > > > > > > > > > > > you outline in the KIP and are (unpleasantly)
> > surprised
> > > > when
> > > > > > it
> > > > > > > > > > > doesn't.
> > > > > > > > > > > > > However, this does beg the question of what the
> > > > usefulness of
> > > > > > > > > having
> > > > > > > > > > > two
> > > > > > > > > > > > > separate `PAUSED` and `STOPPED` states is? Do we
> want
> > > to
> > > > > > > continue
> > > > > > > > > > > > > supporting both these states in the future, or do
> you
> > > > see the
> > > > > > > > > > `STOPPED`
> > > > > > > > > > > > > state eventually causing the existing `PAUSED`
> state
> > to
> > > > be
> > > > > > > > > > deprecated?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 7. I think the idea outlined in the KIP for
> handling
> > a
> > > > new
> > > > > > > state
> > > > > > > > > > during
> > > > > > > > > > > > > cluster downgrades / rolling upgrades is quite
> > clever,
> > > > but do
> > > > > > > you
> > > > > > > > > > think
> > > > > > > > > > > > > there could be any issues with having a mix of
> > "paused"
> > > > and
> > > > > > > > > "stopped"
> > > > > > > > > > > > tasks
> > > > > > > > > > > > > for the same connector across workers in a cluster?
> > At
> > > > the
> > > > > > very
> > > > > > > > > > least,
> > > > > > > > > > > I
> > > > > > > > > > > > > think it would be fairly confusing to most users.
> I'm
> > > > > > > wondering if
> > > > > > > > > > this
> > > > > > > > > > > > can
> > > > > > > > > > > > > be avoided by stating clearly in the KIP that the
> new
> > > > `PUT
> > > > > > > > > > > > > /connectors/{connector}/stop`
> > > > > > > > > > > > > can only be used on a cluster that is fully
> upgraded
> > to
> > > > an AK
> > > > > > > > > version
> > > > > > > > > > > > newer
> > > > > > > > > > > > > than the one which ends up containing changes from
> > this
> > > > KIP
> > > > > > and
> > > > > > > > > that
> > > > > > > > > > > if a
> > > > > > > > > > > > > cluster needs to be downgraded to an older version,
> > the
> > > > user
> > > > > > > should
> > > > > > > > > > > > ensure
> > > > > > > > > > > > > that none of the connectors on the cluster are in a
> > > > stopped
> > > > > > > state?
> > > > > > > > > > With
> > > > > > > > > > > > the
> > > > > > > > > > > > > existing implementation, it looks like an
> > > unknown/invalid
> > > > > > > target
> > > > > > > > > > state
> > > > > > > > > > > > > record is basically just discarded (with an error
> > > message
> > > > > > > logged),
> > > > > > > > > so
> > > > > > > > > > > it
> > > > > > > > > > > > > doesn't seem to be a disastrous failure scenario
> that
> > > can
> > > > > > bring
> > > > > > > > > down
> > > > > > > > > > a
> > > > > > > > > > > > > worker.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Yash
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton
> > > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Ashwin,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for your thoughts. Regarding your
> questions:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 1. The response would show the offsets that are
> > > > visible to
> > > > > > > the
> > > > > > > > > > source
> > > > > > > > > > > > > > connector, so it would combine the contents of
> the
> > > two
> > > > > > > topics,
> > > > > > > > > > giving
> > > > > > > > > > > > > > priority to offsets present in the
> > connector-specific
> > > > > > topic.
> > > > > > > I'm
> > > > > > > > > > > > > imagining
> > > > > > > > > > > > > > a follow-up question that some people may have in
> > > > response
> > > > > > to
> > > > > > > > > that
> > > > > > > > > > is
> > > > > > > > > > > > > > whether we'd want to provide insight into the
> > > contents
> > > > of a
> > > > > > > > > single
> > > > > > > > > > > > topic
> > > > > > > > > > > > > at
> > > > > > > > > > > > > > a time. It may be useful to be able to see this
> > > > information
> > > > > > > in
> > > > > > > > > > order
> > > > > > > > > > > to
> > > > > > > > > > > > > > debug connector issues or verify that it's safe
> to
> > > stop
> > > > > > > using a
> > > > > > > > > > > > > > connector-specific offsets topic (either
> > explicitly,
> > > or
> > > > > > > > > implicitly
> > > > > > > > > > > via
> > > > > > > > > > > > > > cluster downgrade). What do you think about
> adding
> > a
> > > > URL
> > > > > > > query
> > > > > > > > > > > > parameter
> > > > > > > > > > > > > > that allows users to dictate which view of the
> > > > connector's
> > > > > > > > > offsets
> > > > > > > > > > > they
> > > > > > > > > > > > > are
> > > > > > > > > > > > > > given in the REST response, with options for the
> > > > worker's
> > > > > > > global
> > > > > > > > > > > topic,
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > connector-specific topic, and the combined view
> of
> > > them
> > > > > > that
> > > > > > > the
> > > > > > > > > > > > > connector
> > > > > > > > > > > > > > and its tasks see (which would be the default)?
> > This
> > > > may be
> > > > > > > too
> > > > > > > > > > much
> > > > > > > > > > > > for
> > > > > > > > > > > > > V1
> > > > > > > > > > > > > > but it feels like it's at least worth exploring a
> > > bit.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 2. There is no option for this at the moment.
> Reset
> > > > > > > semantics are
> > > > > > > > > > > > > extremely
> > > > > > > > > > > > > > coarse-grained; for source connectors, we delete
> > all
> > > > source
> > > > > > > > > > offsets,
> > > > > > > > > > > > and
> > > > > > > > > > > > > > for sink connectors, we delete the entire
> consumer
> > > > group.
> > > > > > I'm
> > > > > > > > > > hoping
> > > > > > > > > > > > this
> > > > > > > > > > > > > > will be enough for V1 and that, if there's
> > sufficient
> > > > > > demand
> > > > > > > for
> > > > > > > > > > it,
> > > > > > > > > > > we
> > > > > > > > > > > > > can
> > > > > > > > > > > > > > introduce a richer API for resetting or even
> > > modifying
> > > > > > > connector
> > > > > > > > > > > > offsets
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > a follow-up KIP.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 3. Good eye :) I think it's fine to keep the
> > existing
> > > > > > > behavior
> > > > > > > > > for
> > > > > > > > > > > the
> > > > > > > > > > > > > > PAUSED state with the Connector instance, since
> the
> > > > primary
> > > > > > > > > purpose
> > > > > > > > > > > of
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > Connector is to generate task configs and monitor
> > the
> > > > > > > external
> > > > > > > > > > system
> > > > > > > > > > > > for
> > > > > > > > > > > > > > changes. If there's no chance for tasks to be
> > running
> > > > > > > anyways, I
> > > > > > > > > > > don't
> > > > > > > > > > > > > see
> > > > > > > > > > > > > > much value in allowing paused connectors to
> > generate
> > > > new
> > > > > > task
> > > > > > > > > > > configs,
> > > > > > > > > > > > > > especially since each time that happens a
> rebalance
> > > is
> > > > > > > triggered
> > > > > > > > > > and
> > > > > > > > > > > > > > there's a non-zero cost to that. What do you
> think?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin
> > > > > > > > > > <apan...@confluent.io.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for KIP Chris - I think this is a useful
> > > > feature.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Can you please elaborate on the following in
> the
> > > KIP
> > > > -
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1. How would the response of GET
> > > > > > > > > /connectors/{connector}/offsets
> > > > > > > > > > > look
> > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > if the worker has both global and connector
> > > specific
> > > > > > > offsets
> > > > > > > > > > topic
> > > > > > > > > > > ?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 2. How can we pass the reset options like
> > shift-by
> > > ,
> > > > > > > > > to-date-time
> > > > > > > > > > > > etc.
> > > > > > > > > > > > > > > using a REST API like DELETE
> > > > > > > /connectors/{connector}/offsets ?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 3. Today PAUSE operation on a connector invokes
> > its
> > > > stop
> > > > > > > > > method -
> > > > > > > > > > > > will
> > > > > > > > > > > > > > > there be a change here to reduce confusion with
> > the
> > > > new
> > > > > > > > > proposed
> > > > > > > > > > > > > STOPPED
> > > > > > > > > > > > > > > state ?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Ashwin
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton
> > > > > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I noticed a fairly large gap in the first
> > version
> > > > of
> > > > > > > this KIP
> > > > > > > > > > > that
> > > > > > > > > > > > I
> > > > > > > > > > > > > > > > published last Friday, which has to do with
> > > > > > accommodating
> > > > > > > > > > > > connectors
> > > > > > > > > > > > > > > > that target different Kafka clusters than the
> > one
> > > > that
> > > > > > > the
> > > > > > > > > > Kafka
> > > > > > > > > > > > > > Connect
> > > > > > > > > > > > > > > > cluster uses for its internal topics and
> source
> > > > > > > connectors
> > > > > > > > > with
> > > > > > > > > > > > > > dedicated
> > > > > > > > > > > > > > > > offsets topics. I've since updated the KIP to
> > > > address
> > > > > > > this
> > > > > > > > > gap,
> > > > > > > > > > > > which
> > > > > > > > > > > > > > has
> > > > > > > > > > > > > > > > substantially altered the design. Wanted to
> > give
> > > a
> > > > > > > heads-up
> > > > > > > > > to
> > > > > > > > > > > > anyone
> > > > > > > > > > > > > > > > that's already started reviewing.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton
> <
> > > > > > > > > chr...@aiven.io>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I'd like to begin discussion on a KIP to
> add
> > > > offsets
> > > > > > > > > support
> > > > > > > > > > to
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > > > > Connect REST API:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > > > >
> > > >
> > >
> >
>

Reply via email to