Hi Greg,

Thanks for your thoughts. Responses inline:

> Does this mean that a connector may be assigned to a non-leader worker
in the cluster, an alter request comes in, and a connector instance is
temporarily started on the leader to service the request?

> While the alterOffsets method is being called, will the leader need to
ignore potential requestTaskReconfiguration calls?
On the surface, this seems to conflict with the semantics of the rebalance
subsystem, as connectors are being started where they are not assigned.
Additionally, it seems to conflict with the semantics of the STOPPED state,
which when read literally, might imply that the connector is not started
_anywhere_ in the cluster.

Good catch! This was an oversight on my part when adding the Connector API
for altering offsets. I'd like to keep delegating offset alter requests to
the leader (for reasons discussed below), but it does seem like relying on
Connector::start to deliver a configuration to connectors before invoking
that method is likely to cause more problems than it solves.

One alternative is to add the connector config as an argument to the
alterOffsets method; this would behave similarly to the validate method,
which accepts a raw config and doesn't provide any guarantees about where
the connector is hosted when that method is called or whether start has or
has not yet been invoked on it. Thoughts?

> How will the check for the STOPPED state on alter requests be
implemented, is it from reading the config topic or the status topic?

Config topic only. If it's critical that alter requests not be overwritten
by zombie tasks, fairly strong guarantees can be provided already:
- For sink connectors, the request will naturally fail if there are any
active members of the consumer group for the connector
- For source connectors, exactly-once support can be enabled, at which
point the preemptive zombie fencing round we perform before proceeding with
the reset request should disable any zombie source tasks' producers from
writing any more records/offsets to Kafka

We can also check the config topic to ensure that the connector's set of
task configs is empty (discussed further below).

> Is there synchronization to ensure that a connector on a non-leader is
STOPPED before an instance is started on the leader? If not, there might be
a risk of the non-leader connector overwriting the effects of the
alterOffsets on the leader connector.

A few things to keep in mind here:

1. Connector instances cannot (currently) produce offsets on their own
2. By delegating alter requests to the leader, we can ensure that the
connector's set of task configs is empty before proceeding with the
request. Because task configs are only written to the config topic by the
leader, there's no risk of new tasks spinning up in between the leader
doing that check and servicing the offset alteration request (well,
technically there is if you have a zombie leader in your cluster, but that
extremely rare scenario is addressed naturally if exactly-once source
support is enabled on the cluster since we use a transactional producer for
writes to the config topic then). This, coupled with the zombie-handling
logic described above, should be sufficient to address your concerns, but
let me know if I've missed anything.

Cheers,

Chris

On Wed, Jan 18, 2023 at 2:57 PM Greg Harris <greg.har...@aiven.io.invalid>
wrote:

> Hi Chris,
>
> I had some clarifying questions about the alterOffsets hooks. The KIP
> includes these elements of the design:
>
> * The Javadoc for the methods mentions that the alterOffsets methods are
> only called on started and initialized connector objects.
> * The 'Altering' and 'Resetting' offsets descriptions indicate that the
> requests are forwarded to the leader.
> * And finally, the description of "A new STOPPED state" includes a note
> that the Connector will not be started, and it will not be able to generate
> new task configurations
>
> 1. Does this mean that a connector may be assigned to a non-leader worker
> in the cluster, an alter request comes in, and a connector instance is
> temporarily started on the leader to service the request?
> 2. While the alterOffsets method is being called, will the leader need to
> ignore potential requestTaskReconfiguration calls?
> On the surface, this seems to conflict with the semantics of the rebalance
> subsystem, as connectors are being started where they are not assigned.
> Additionally, it seems to conflict with the semantics of the STOPPED state,
> which when read literally, might imply that the connector is not started
> _anywhere_ in the cluster.
>
> I think that if we wish to provide these alterOffsets methods, they must be
> called on started and initialized connector objects.
> And if that's the case, then we will need to ignore
> requestTaskReconfigurationCalls.
> But we may need to relax the wording on the Stopped state to add an
> exception for temporary starts, while still preventing it from using
> resources in the background.
> We should also consider whether we can influence the rebalance algorithm to
> allocate STOPPED connectors to the leader, or not allocate them at all, or
> use the rebalance algorithm's connector assignment to distribute the
> alterOffsets calls across the cluster.
>
> And slightly related:
> 3. How will the check for the STOPPED state on alter requests be
> implemented, is it from reading the config topic or the status topic?
> 4. Is there synchronization to ensure that a connector on a non-leader is
> STOPPED before an instance is started on the leader? If not, there might be
> a risk of the non-leader connector overwriting the effects of the
> alterOffsets on the leader connector.
>
> Thanks,
> Greg
>
>
> On Fri, Dec 16, 2022 at 8:26 AM Yash Mayya <yash.ma...@gmail.com> wrote:
>
> > Hi Chris,
> >
> > Thanks for clarifying, I had missed that update in the KIP (the bit about
> > altering/resetting offsets response). I think your arguments for not
> going
> > with an additional method or a custom return type make sense.
> >
> > Thanks,
> > Yash
> >
> > On Sat, Dec 10, 2022 at 12:28 AM Chris Egerton <chr...@aiven.io.invalid>
> > wrote:
> >
> > > Hi Yash,
> > >
> > > The idea with the boolean is to just signify that a connector has
> > > overridden this method, which allows us to issue a definitive response
> in
> > > the REST API when servicing offset alter/reset requests (described more
> > in
> > > detail here:
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect#KIP875:FirstclassoffsetssupportinKafkaConnect-Altering/resettingoffsets(response)
> > > ).
> > >
> > > One alternative could be to add some kind of OffsetAlterResult return
> > type
> > > for this method with constructors like "OffsetAlterResult.success()",
> > > "OffsetAlterResult.unknown()" (default), and
> > > "OffsetAlterResult.failure(Throwable t)", but it's hard to envision a
> > > reason to use the "failure" static factory method instead of just
> > throwing
> > > an exception, at which point, we're only left with two reasonable
> > methods:
> > > success, and unknown.
> > >
> > > Another could be to add a "boolean canResetOffsets()" method to the
> > > SourceConnector class, but adding a separate method seems like
> overkill,
> > > and developers may not understand that implementing that method to
> always
> > > return "false" won't actually cause offset reset requests to not take
> > > place.
> > >
> > > One final option could be to have something like an AlterOffsetsSupport
> > > enum with values SUPPORTED and UNSUPPORTED and a new
> "AlterOffsetsSupport
> > > alterOffsetsSupport()" SourceConnector method that returns null by
> > default
> > > (which implicitly maps to the "unknown support" response message in the
> > > REST API). This would line up with the ExactlyOnceSupport API we added
> in
> > > KIP-618. However, I'm hesitant to adopt it because it's not strictly
> > > necessary to address the cases that we want to cover; everything can be
> > > handled with a single method that gets invoked on offset alter/reset
> > > requests. With exactly-once support, we added this hook because it was
> > > designed to be invoked in a different point in the connector lifecycle.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Wed, Dec 7, 2022 at 9:46 AM Yash Mayya <yash.ma...@gmail.com>
> wrote:
> > >
> > > > Hi Chris,
> > > >
> > > > Sorry for the late reply.
> > > >
> > > > > I don't believe logging an error message is sufficient for
> > > > > handling failures to reset-after-delete. IMO it's highly
> > > > > likely that users will either shoot themselves in the foot
> > > > > by not reading the fine print and realizing that the offset
> > > > > request may have failed, or will ask for better visibility
> > > > > into the success or failure of the reset request than
> > > > > scanning log files.
> > > >
> > > > Your reasoning for deferring the reset offsets after delete
> > functionality
> > > > to a separate KIP makes sense, thanks for the explanation.
> > > >
> > > > > I've updated the KIP with the
> > > > > developer-facing API changes for this logic
> > > >
> > > > This is great, I hadn't considered the other two (very valid)
> use-cases
> > > for
> > > > such an API, thanks for adding these with elaborate documentation!
> > > However,
> > > > the significance / use of the boolean value returned by the two
> methods
> > > is
> > > > not fully clear, could you please clarify?
> > > >
> > > > Thanks,
> > > > Yash
> > > >
> > > > On Fri, Nov 18, 2022 at 1:06 AM Chris Egerton
> <chr...@aiven.io.invalid
> > >
> > > > wrote:
> > > >
> > > > > Hi Yash,
> > > > >
> > > > > I've updated the KIP with the correct "kafka_topic",
> > "kafka_partition",
> > > > and
> > > > > "kafka_offset" keys in the JSON examples (settled on those instead
> of
> > > > > prefixing with "Kafka " for better interactions with tooling like
> > JQ).
> > > > I've
> > > > > also added a note about sink offset requests failing if there are
> > still
> > > > > active members in the consumer group.
> > > > >
> > > > > I don't believe logging an error message is sufficient for handling
> > > > > failures to reset-after-delete. IMO it's highly likely that users
> > will
> > > > > either shoot themselves in the foot by not reading the fine print
> and
> > > > > realizing that the offset request may have failed, or will ask for
> > > better
> > > > > visibility into the success or failure of the reset request than
> > > scanning
> > > > > log files. I don't doubt that there are ways to address this, but I
> > > would
> > > > > prefer to leave them to a separate KIP since the required design
> work
> > > is
> > > > > non-trivial and I do not feel that the added burden is worth tying
> to
> > > > this
> > > > > KIP as a blocker.
> > > > >
> > > > > I was really hoping to avoid introducing a change to the
> > > developer-facing
> > > > > APIs with this KIP, but after giving it some thought I think this
> may
> > > be
> > > > > unavoidable. It's debatable whether validation of altered offsets
> is
> > a
> > > > good
> > > > > enough use case on its own for this kind of API, but since there
> are
> > > also
> > > > > connectors out there that manage offsets externally, we should
> > probably
> > > > add
> > > > > a hook to allow those external offsets to be managed, which can
> then
> > > > serve
> > > > > double- or even-triple duty as a hook to validate custom offsets
> and
> > to
> > > > > notify users whether offset resets/alterations are supported at all
> > > > (which
> > > > > they may not be if, for example, offsets are coupled tightly with
> the
> > > > data
> > > > > written by a sink connector). I've updated the KIP with the
> > > > > developer-facing API changes for this logic; let me know what you
> > > think.
> > > > >
> > > > > Cheers,
> > > > >
> > > > > Chris
> > > > >
> > > > > On Mon, Nov 14, 2022 at 10:16 AM Mickael Maison <
> > > > mickael.mai...@gmail.com>
> > > > > wrote:
> > > > >
> > > > > > Hi Chris,
> > > > > >
> > > > > > Thanks for the update!
> > > > > >
> > > > > > It's relatively common to only want to reset offsets for a
> specific
> > > > > > resource (for example with MirrorMaker for one or a group of
> > topics).
> > > > > > Could it be possible to add a way to do so? Either by providing a
> > > > > > payload to DELETE or by setting the offset field to an empty
> object
> > > in
> > > > > > the PATCH payload?
> > > > > >
> > > > > > Thanks,
> > > > > > Mickael
> > > > > >
> > > > > > On Sat, Nov 12, 2022 at 3:33 PM Yash Mayya <yash.ma...@gmail.com
> >
> > > > wrote:
> > > > > > >
> > > > > > > Hi Chris,
> > > > > > >
> > > > > > > Thanks for pointing out that the consumer group deletion step
> > > itself
> > > > > will
> > > > > > > fail in case of zombie sink tasks. Since we can't get any
> > stronger
> > > > > > > guarantees from consumers (unlike with transactional
> producers),
> > I
> > > > > think
> > > > > > it
> > > > > > > makes perfect sense to fail the offset reset attempt in such
> > > > scenarios
> > > > > > with
> > > > > > > a relevant error message to the user. I was more concerned
> about
> > > > > silently
> > > > > > > failing but it looks like that won't be an issue. It's probably
> > > worth
> > > > > > > calling out this difference between source / sink connectors
> > > > explicitly
> > > > > > in
> > > > > > > the KIP, what do you think?
> > > > > > >
> > > > > > > > changing the field names for sink offsets
> > > > > > > > from "topic", "partition", and "offset" to "Kafka
> > > > > > > > topic", "Kafka partition", and "Kafka offset" respectively,
> to
> > > > > > > > reduce the stuttering effect of having a "partition" field
> > inside
> > > > > > > >  a "partition" field and the same with an "offset" field
> > > > > > >
> > > > > > > The KIP is still using the nested partition / offset fields by
> > the
> > > > way
> > > > > -
> > > > > > > has it not been updated because we're waiting for consensus on
> > the
> > > > > field
> > > > > > > names?
> > > > > > >
> > > > > > > > The reset-after-delete feature, on the other
> > > > > > > > hand, is actually pretty tricky to design; I've updated the
> > > > > > > > rationale in the KIP for delaying it and clarified that it's
> > not
> > > > > > > > just a matter of implementation but also design work.
> > > > > > >
> > > > > > > I like the idea of writing an offset reset request to the
> config
> > > > topic
> > > > > > > which will be processed by the herder's config update listener
> -
> > > I'm
> > > > > not
> > > > > > > sure I fully follow the concerns with regard to handling
> > failures?
> > > > Why
> > > > > > > can't we simply log an error saying that the offset reset for
> the
> > > > > deleted
> > > > > > > connector "xyz" failed due to reason "abc"? As long as it's
> > > > documented
> > > > > > that
> > > > > > > connector deletion and offset resets are asynchronous and a
> > success
> > > > > > > response only means that the request was initiated successfully
> > > > (which
> > > > > is
> > > > > > > the case even today with normal connector deletion), we should
> be
> > > > fine
> > > > > > > right?
> > > > > > >
> > > > > > > Thanks for adding the new PATCH endpoint to the KIP, I think
> > it's a
> > > > lot
> > > > > > > more useful for this use case than a PUT endpoint would be! One
> > > thing
> > > > > > > that I was thinking about with the new PATCH endpoint is that
> > while
> > > > we
> > > > > > can
> > > > > > > easily validate the request body format for sink connectors
> > (since
> > > > it's
> > > > > > the
> > > > > > > same across all connectors), we can't do the same for source
> > > > connectors
> > > > > > as
> > > > > > > things stand today since each source connector implementation
> can
> > > > > define
> > > > > > > its own source partition and offset structures. Without any
> > > > validation,
> > > > > > > writing a bad offset for a source connector via the PATCH
> > endpoint
> > > > > could
> > > > > > > cause it to fail with hard to discern errors. I'm wondering if
> we
> > > > could
> > > > > > add
> > > > > > > a new method to the `SourceConnector` class (which should be
> > > > overridden
> > > > > > by
> > > > > > > source connector implementations) that would validate whether
> or
> > > not
> > > > > the
> > > > > > > provided source partitions and source offsets are valid for the
> > > > > connector
> > > > > > > (it could have a default implementation returning true
> > > > unconditionally
> > > > > > for
> > > > > > > backward compatibility).
> > > > > > >
> > > > > > > > I've also added an implementation plan to the KIP, which
> calls
> > > > > > > > out the different parts that can be worked on independently
> so
> > > that
> > > > > > > > others (hi Yash 🙂) can also tackle parts of this if they'd
> > like.
> > > > > > >
> > > > > > > I'd be more than happy to pick up one or more of the
> > implementation
> > > > > > parts,
> > > > > > > thanks for breaking it up into granular pieces!
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Yash
> > > > > > >
> > > > > > > On Fri, Nov 11, 2022 at 11:25 PM Chris Egerton
> > > > <chr...@aiven.io.invalid
> > > > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Mickael,
> > > > > > > >
> > > > > > > > Thanks for your feedback. This has been on my TODO list as
> well
> > > :)
> > > > > > > >
> > > > > > > > 1. That's fair! Support for altering offsets is easy enough
> to
> > > > > design,
> > > > > > so
> > > > > > > > I've added it to the KIP. The reset-after-delete feature, on
> > the
> > > > > other
> > > > > > > > hand, is actually pretty tricky to design; I've updated the
> > > > rationale
> > > > > > in
> > > > > > > > the KIP for delaying it and clarified that it's not just a
> > matter
> > > > of
> > > > > > > > implementation but also design work. If you or anyone else
> can
> > > > think
> > > > > > of a
> > > > > > > > clean, simple way to implement it, I'm happy to add it to
> this
> > > KIP,
> > > > > but
> > > > > > > > otherwise I'd prefer not to tie it to the approval and
> release
> > of
> > > > the
> > > > > > > > features already proposed in the KIP.
> > > > > > > >
> > > > > > > > 2. Yeah, it's a little awkward. In my head I've justified the
> > > > > ugliness
> > > > > > of
> > > > > > > > the implementation with the smooth user-facing experience;
> > > falling
> > > > > back
> > > > > > > > seamlessly on the PAUSED state without even logging an error
> > > > message
> > > > > > is a
> > > > > > > > lot better than I'd initially hoped for when I was designing
> > this
> > > > > > feature.
> > > > > > > >
> > > > > > > > I've also added an implementation plan to the KIP, which
> calls
> > > out
> > > > > the
> > > > > > > > different parts that can be worked on independently so that
> > > others
> > > > > (hi
> > > > > > Yash
> > > > > > > > 🙂) can also tackle parts of this if they'd like.
> > > > > > > >
> > > > > > > > Finally, I've removed the "type" field from the response body
> > > > format
> > > > > > for
> > > > > > > > offset read requests. This way, users can copy+paste the
> > response
> > > > > from
> > > > > > that
> > > > > > > > endpoint into a request to alter a connector's offsets
> without
> > > > having
> > > > > > to
> > > > > > > > remove the "type" field first. An alternative was to keep the
> > > > "type"
> > > > > > field
> > > > > > > > and add it to the request body format for altering offsets,
> but
> > > > this
> > > > > > didn't
> > > > > > > > seem to make enough sense for cases not involving the
> > > > aforementioned
> > > > > > > > copy+paste process.
> > > > > > > >
> > > > > > > > Cheers,
> > > > > > > >
> > > > > > > > Chris
> > > > > > > >
> > > > > > > > On Wed, Nov 9, 2022 at 9:57 AM Mickael Maison <
> > > > > > mickael.mai...@gmail.com>
> > > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Chris,
> > > > > > > > >
> > > > > > > > > Thanks for the KIP, you're picking something that has been
> in
> > > my
> > > > > todo
> > > > > > > > > list for a while ;)
> > > > > > > > >
> > > > > > > > > It looks good overall, I just have a couple of questions:
> > > > > > > > > 1) I consider both features listed in Future Work pretty
> > > > important.
> > > > > > In
> > > > > > > > > both cases you mention the reason for not addressing them
> now
> > > is
> > > > > > > > > because of the implementation. If the design is simple and
> if
> > > we
> > > > > have
> > > > > > > > > volunteers to implement them, I wonder if we could include
> > them
> > > > in
> > > > > > > > > this KIP. So you would not have to implement everything but
> > we
> > > > > would
> > > > > > > > > have a single KIP and vote.
> > > > > > > > >
> > > > > > > > > 2) Regarding the backward compatibility for the stopped
> > state.
> > > > The
> > > > > > > > > "state.v2" field is a bit unfortunate but I can't think of
> a
> > > > better
> > > > > > > > > solution. The other alternative would be to not do anything
> > > but I
> > > > > > > > > think the graceful degradation you propose is a bit better.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Mickael
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Tue, Nov 8, 2022 at 5:58 PM Chris Egerton
> > > > > <chr...@aiven.io.invalid
> > > > > > >
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > Hi Yash,
> > > > > > > > > >
> > > > > > > > > > Good question! This is actually a subtle source of
> > asymmetry
> > > in
> > > > > the
> > > > > > > > > current
> > > > > > > > > > proposal. Requests to delete a consumer group with active
> > > > members
> > > > > > will
> > > > > > > > > > fail, so if there are zombie sink tasks that are still
> > > > > > communicating
> > > > > > > > with
> > > > > > > > > > Kafka, offset reset requests for that connector will also
> > > fail.
> > > > > It
> > > > > > is
> > > > > > > > > > possible to use an admin client to remove all active
> > members
> > > > from
> > > > > > the
> > > > > > > > > group
> > > > > > > > > > and then delete the group. However, this solution isn't
> as
> > > > > > complete as
> > > > > > > > > the
> > > > > > > > > > zombie fencing that we can perform for exactly-once
> source
> > > > tasks,
> > > > > > since
> > > > > > > > > > removing consumers from a group doesn't prevent them from
> > > > > > immediately
> > > > > > > > > > rejoining the group, which would either cause the group
> > > > deletion
> > > > > > > > request
> > > > > > > > > to
> > > > > > > > > > fail (if they rejoin before the group is deleted), or
> > > recreate
> > > > > the
> > > > > > > > group
> > > > > > > > > > (if they rejoin after the group is deleted).
> > > > > > > > > >
> > > > > > > > > > For ease of implementation, I'd prefer to leave the
> > asymmetry
> > > > in
> > > > > > the
> > > > > > > > API
> > > > > > > > > > for now and fail fast and clearly if there are still
> > > consumers
> > > > > > active
> > > > > > > > in
> > > > > > > > > > the sink connector's group. We can try to detect this
> case
> > > and
> > > > > > provide
> > > > > > > > a
> > > > > > > > > > helpful error message to the user explaining why the
> offset
> > > > reset
> > > > > > > > request
> > > > > > > > > > has failed and some steps they can take to try to resolve
> > > > things
> > > > > > (wait
> > > > > > > > > for
> > > > > > > > > > slow task shutdown to complete, restart zombie workers
> > and/or
> > > > > > workers
> > > > > > > > > with
> > > > > > > > > > blocked tasks on them). In the future we can possibly
> even
> > > > > revisit
> > > > > > > > > KIP-611
> > > > > > > > > > [1] or something like it to provide better insight into
> > > zombie
> > > > > > tasks
> > > > > > > > on a
> > > > > > > > > > worker so that it's easier to find which tasks have been
> > > > > abandoned
> > > > > > but
> > > > > > > > > are
> > > > > > > > > > still running.
> > > > > > > > > >
> > > > > > > > > > Let me know what you think; this is an important point to
> > > call
> > > > > out
> > > > > > and
> > > > > > > > if
> > > > > > > > > > we can reach some consensus on how to handle sink
> connector
> > > > > offset
> > > > > > > > resets
> > > > > > > > > > w/r/t zombie tasks, I'll update the KIP with the details.
> > > > > > > > > >
> > > > > > > > > > [1] -
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-611%3A+Improved+Handling+of+Abandoned+Connectors+and+Tasks
> > > > > > > > > >
> > > > > > > > > > Cheers,
> > > > > > > > > >
> > > > > > > > > > Chris
> > > > > > > > > >
> > > > > > > > > > On Tue, Nov 8, 2022 at 8:00 AM Yash Mayya <
> > > > yash.ma...@gmail.com>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Chris,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the response and the explanations, I think
> > > you've
> > > > > > answered
> > > > > > > > > > > pretty much all the questions I had meticulously!
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > if something goes wrong while resetting offsets,
> > there's
> > > no
> > > > > > > > > > > > immediate impact--the connector will still be in the
> > > > STOPPED
> > > > > > > > > > > >  state. The REST response for requests to reset the
> > > offsets
> > > > > > > > > > > > will clearly call out that the operation has failed,
> > and
> > > if
> > > > > > > > > necessary,
> > > > > > > > > > > > we can probably also add a scary-looking warning
> > message
> > > > > > > > > > > > stating that we can't guarantee which offsets have
> been
> > > > > > > > successfully
> > > > > > > > > > > >  wiped and which haven't. Users can query the exact
> > > offsets
> > > > > of
> > > > > > > > > > > > the connector at this point to determine what will
> > happen
> > > > > > if/what
> > > > > > > > > they
> > > > > > > > > > > > resume it. And they can repeat attempts to reset the
> > > > offsets
> > > > > as
> > > > > > > > many
> > > > > > > > > > > >  times as they'd like until they get back a 2XX
> > response,
> > > > > > > > indicating
> > > > > > > > > > > > that it's finally safe to resume the connector.
> > Thoughts?
> > > > > > > > > > >
> > > > > > > > > > > Yeah, I agree, the case that I mentioned earlier where
> a
> > > user
> > > > > > would
> > > > > > > > > try to
> > > > > > > > > > > resume a stopped connector after a failed offset reset
> > > > attempt
> > > > > > > > without
> > > > > > > > > > > knowing that the offset reset attempt didn't fail
> cleanly
> > > is
> > > > > > probably
> > > > > > > > > just
> > > > > > > > > > > an extreme edge case. I think as long as the response
> is
> > > > > verbose
> > > > > > > > > enough and
> > > > > > > > > > > self explanatory, we should be fine.
> > > > > > > > > > >
> > > > > > > > > > > Another question that I had was behavior w.r.t sink
> > > connector
> > > > > > offset
> > > > > > > > > resets
> > > > > > > > > > > when there are zombie tasks/workers in the Connect
> > cluster
> > > -
> > > > > the
> > > > > > KIP
> > > > > > > > > > > mentions that for sink connectors offset resets will be
> > > done
> > > > by
> > > > > > > > > deleting
> > > > > > > > > > > the consumer group. However, if there are zombie tasks
> > > which
> > > > > are
> > > > > > > > still
> > > > > > > > > able
> > > > > > > > > > > to communicate with the Kafka cluster that the sink
> > > connector
> > > > > is
> > > > > > > > > consuming
> > > > > > > > > > > from, I think the consumer group will automatically get
> > > > > > re-created
> > > > > > > > and
> > > > > > > > > the
> > > > > > > > > > > zombie task may be able to commit offsets for the
> > > partitions
> > > > > > that it
> > > > > > > > is
> > > > > > > > > > > consuming from?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Yash
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Nov 4, 2022 at 10:27 PM Chris Egerton
> > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks again for your thoughts! Responses to ongoing
> > > > > > discussions
> > > > > > > > > inline
> > > > > > > > > > > > (easier to track context than referencing comment
> > > numbers):
> > > > > > > > > > > >
> > > > > > > > > > > > > However, this then leads me to wonder if we can
> make
> > > that
> > > > > > > > explicit
> > > > > > > > > by
> > > > > > > > > > > > including "connect" or "connector" in the higher
> level
> > > > field
> > > > > > names?
> > > > > > > > > Or do
> > > > > > > > > > > > you think this isn't required given that we're
> talking
> > > > about
> > > > > a
> > > > > > > > > Connect
> > > > > > > > > > > > specific REST API in the first place?
> > > > > > > > > > > >
> > > > > > > > > > > > I think "partition" and "offset" are fine as field
> > names
> > > > but
> > > > > > I'm
> > > > > > > > not
> > > > > > > > > > > hugely
> > > > > > > > > > > > opposed to adding "connector " as a prefix to them;
> > would
> > > > be
> > > > > > > > > interested
> > > > > > > > > > > in
> > > > > > > > > > > > others' thoughts.
> > > > > > > > > > > >
> > > > > > > > > > > > > I'm not sure I followed why the unresolved writes
> to
> > > the
> > > > > > config
> > > > > > > > > topic
> > > > > > > > > > > > would be an issue - wouldn't the delete offsets
> request
> > > be
> > > > > > added to
> > > > > > > > > the
> > > > > > > > > > > > herder's request queue and whenever it is processed,
> > we'd
> > > > > > anyway
> > > > > > > > > need to
> > > > > > > > > > > > check if all the prerequisites for the request are
> > > > satisfied?
> > > > > > > > > > > >
> > > > > > > > > > > > Some requests are handled in multiple steps. For
> > example,
> > > > > > deleting
> > > > > > > > a
> > > > > > > > > > > > connector (1) adds a request to the herder queue to
> > > write a
> > > > > > > > > tombstone to
> > > > > > > > > > > > the config topic (or, if the worker isn't the leader,
> > > > forward
> > > > > > the
> > > > > > > > > request
> > > > > > > > > > > > to the leader). (2) Once that tombstone is picked up,
> > > (3) a
> > > > > > > > rebalance
> > > > > > > > > > > > ensues, and then after it's finally complete, (4) the
> > > > > > connector and
> > > > > > > > > its
> > > > > > > > > > > > tasks are shut down. I probably could have used
> better
> > > > > > terminology,
> > > > > > > > > but
> > > > > > > > > > > > what I meant by "unresolved writes to the config
> topic"
> > > > was a
> > > > > > case
> > > > > > > > in
> > > > > > > > > > > > between steps (2) and (3)--where the worker has
> already
> > > > read
> > > > > > that
> > > > > > > > > > > tombstone
> > > > > > > > > > > > from the config topic and knows that a rebalance is
> > > > pending,
> > > > > > but
> > > > > > > > > hasn't
> > > > > > > > > > > > begun participating in that rebalance yet. In the
> > > > > > DistributedHerder
> > > > > > > > > > > class,
> > > > > > > > > > > > this is done via the `checkRebalanceNeeded` method.
> > > > > > > > > > > >
> > > > > > > > > > > > > We can probably revisit this potential deprecation
> > [of
> > > > the
> > > > > > PAUSED
> > > > > > > > > > > state]
> > > > > > > > > > > > in the future based on user feedback and how the
> > adoption
> > > > of
> > > > > > the
> > > > > > > > new
> > > > > > > > > > > > proposed stop endpoint looks like, what do you think?
> > > > > > > > > > > >
> > > > > > > > > > > > Yeah, revisiting in the future seems reasonable. 👍
> > > > > > > > > > > >
> > > > > > > > > > > > And responses to new comments here:
> > > > > > > > > > > >
> > > > > > > > > > > > 8. Yep, we'll start tracking offsets by connector. I
> > > don't
> > > > > > believe
> > > > > > > > > this
> > > > > > > > > > > > should be too difficult, and suspect that the only
> > reason
> > > > we
> > > > > > track
> > > > > > > > > raw
> > > > > > > > > > > byte
> > > > > > > > > > > > arrays instead of pre-deserializing offset topic
> > > > information
> > > > > > into
> > > > > > > > > > > something
> > > > > > > > > > > > more useful is because Connect originally had
> pluggable
> > > > > > internal
> > > > > > > > > > > > converters. Now that we're hardcoded to use the JSON
> > > > > converter
> > > > > > it
> > > > > > > > > should
> > > > > > > > > > > be
> > > > > > > > > > > > fine to track offsets on a per-connector basis as
> > they're
> > > > > read
> > > > > > from
> > > > > > > > > the
> > > > > > > > > > > > offsets topic.
> > > > > > > > > > > >
> > > > > > > > > > > > 9. I'm hesitant to introduce this type of feature
> right
> > > now
> > > > > > because
> > > > > > > > > of
> > > > > > > > > > > all
> > > > > > > > > > > > of the gotchas that would come with it. In
> > > > security-conscious
> > > > > > > > > > > environments,
> > > > > > > > > > > > it's possible that a sink connector's principal may
> > have
> > > > > > access to
> > > > > > > > > the
> > > > > > > > > > > > consumer group used by the connector, but the
> worker's
> > > > > > principal
> > > > > > > > may
> > > > > > > > > not.
> > > > > > > > > > > > There's also the case where source connectors have
> > > separate
> > > > > > offsets
> > > > > > > > > > > topics,
> > > > > > > > > > > > or sink connectors have overridden consumer group
> IDs,
> > or
> > > > > sink
> > > > > > or
> > > > > > > > > source
> > > > > > > > > > > > connectors work against a different Kafka cluster
> than
> > > the
> > > > > one
> > > > > > that
> > > > > > > > > their
> > > > > > > > > > > > worker uses. Overall, I'd rather provide a single API
> > > that
> > > > > > works in
> > > > > > > > > all
> > > > > > > > > > > > cases rather than risk confusing and alienating users
> > by
> > > > > > trying to
> > > > > > > > > make
> > > > > > > > > > > > their lives easier in a subset of cases.
> > > > > > > > > > > >
> > > > > > > > > > > > 10. Hmm... I don't think the order of the writes
> > matters
> > > > too
> > > > > > much
> > > > > > > > > here,
> > > > > > > > > > > but
> > > > > > > > > > > > we probably could start by deleting from the global
> > topic
> > > > > > first,
> > > > > > > > > that's
> > > > > > > > > > > > true. The reason I'm not hugely concerned about this
> > case
> > > > is
> > > > > > that
> > > > > > > > if
> > > > > > > > > > > > something goes wrong while resetting offsets, there's
> > no
> > > > > > immediate
> > > > > > > > > > > > impact--the connector will still be in the STOPPED
> > state.
> > > > The
> > > > > > REST
> > > > > > > > > > > response
> > > > > > > > > > > > for requests to reset the offsets will clearly call
> out
> > > > that
> > > > > > the
> > > > > > > > > > > operation
> > > > > > > > > > > > has failed, and if necessary, we can probably also
> add
> > a
> > > > > > > > > scary-looking
> > > > > > > > > > > > warning message stating that we can't guarantee which
> > > > offsets
> > > > > > have
> > > > > > > > > been
> > > > > > > > > > > > successfully wiped and which haven't. Users can query
> > the
> > > > > exact
> > > > > > > > > offsets
> > > > > > > > > > > of
> > > > > > > > > > > > the connector at this point to determine what will
> > happen
> > > > > > if/what
> > > > > > > > > they
> > > > > > > > > > > > resume it. And they can repeat attempts to reset the
> > > > offsets
> > > > > as
> > > > > > > > many
> > > > > > > > > > > times
> > > > > > > > > > > > as they'd like until they get back a 2XX response,
> > > > indicating
> > > > > > that
> > > > > > > > > it's
> > > > > > > > > > > > finally safe to resume the connector. Thoughts?
> > > > > > > > > > > >
> > > > > > > > > > > > 11. I haven't thought too much about it. I think
> > > something
> > > > > > like the
> > > > > > > > > > > > Monitorable* connectors would probably serve our
> needs
> > > > here;
> > > > > > we can
> > > > > > > > > > > > instantiate them on a running Connect cluster and
> then
> > > use
> > > > > > various
> > > > > > > > > > > handles
> > > > > > > > > > > > to know how many times they've been polled, committed
> > > > > records,
> > > > > > etc.
> > > > > > > > > If
> > > > > > > > > > > > necessary we can tweak those classes or even write
> our
> > > own.
> > > > > But
> > > > > > > > > anyways,
> > > > > > > > > > > > once that's all done, the test will be something like
> > > > > "create a
> > > > > > > > > > > connector,
> > > > > > > > > > > > wait for it to produce N records (each of which
> > contains
> > > > some
> > > > > > kind
> > > > > > > > of
> > > > > > > > > > > > predictable offset), and ensure that the offsets for
> it
> > > in
> > > > > the
> > > > > > REST
> > > > > > > > > API
> > > > > > > > > > > > match up with the ones we'd expect from N records".
> > Does
> > > > that
> > > > > > > > answer
> > > > > > > > > your
> > > > > > > > > > > > question?
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > >
> > > > > > > > > > > > Chris
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Oct 18, 2022 at 3:28 AM Yash Mayya <
> > > > > > yash.ma...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1. Thanks a lot for elaborating on this, I'm now
> > > > convinced
> > > > > > about
> > > > > > > > > the
> > > > > > > > > > > > > usefulness of the new offset reset endpoint.
> > Regarding
> > > > the
> > > > > > > > > follow-up
> > > > > > > > > > > KIP
> > > > > > > > > > > > > for a fine-grained offset write API, I'd be happy
> to
> > > take
> > > > > > that on
> > > > > > > > > once
> > > > > > > > > > > > this
> > > > > > > > > > > > > KIP is finalized and I will definitely look forward
> > to
> > > > your
> > > > > > > > > feedback on
> > > > > > > > > > > > > that one!
> > > > > > > > > > > > >
> > > > > > > > > > > > > 2. Gotcha, the motivation makes more sense to me
> now.
> > > So
> > > > > the
> > > > > > > > higher
> > > > > > > > > > > level
> > > > > > > > > > > > > partition field represents a Connect specific
> > "logical
> > > > > > partition"
> > > > > > > > > of
> > > > > > > > > > > > sorts
> > > > > > > > > > > > > - i.e. the source partition as defined by a
> connector
> > > for
> > > > > > source
> > > > > > > > > > > > connectors
> > > > > > > > > > > > > and a Kafka topic + partition for sink connectors.
> I
> > > like
> > > > > the
> > > > > > > > idea
> > > > > > > > > of
> > > > > > > > > > > > > adding a Kafka prefix to the lower level
> > > partition/offset
> > > > > > (and
> > > > > > > > > topic)
> > > > > > > > > > > > > fields which basically makes it more clear
> (although
> > > > > > implicitly)
> > > > > > > > > that
> > > > > > > > > > > the
> > > > > > > > > > > > > higher level partition/offset field is Connect
> > specific
> > > > and
> > > > > > not
> > > > > > > > the
> > > > > > > > > > > same
> > > > > > > > > > > > as
> > > > > > > > > > > > > what those terms represent in Kafka itself.
> However,
> > > this
> > > > > > then
> > > > > > > > > leads me
> > > > > > > > > > > > to
> > > > > > > > > > > > > wonder if we can make that explicit by including
> > > > "connect"
> > > > > or
> > > > > > > > > > > "connector"
> > > > > > > > > > > > > in the higher level field names? Or do you think
> this
> > > > isn't
> > > > > > > > > required
> > > > > > > > > > > > given
> > > > > > > > > > > > > that we're talking about a Connect specific REST
> API
> > in
> > > > the
> > > > > > first
> > > > > > > > > > > place?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 3. Thanks, I think the response structure
> definitely
> > > > looks
> > > > > > better
> > > > > > > > > now!
> > > > > > > > > > > > >
> > > > > > > > > > > > > 4. Interesting, I'd be curious to learn why we
> might
> > > want
> > > > > to
> > > > > > > > change
> > > > > > > > > > > this
> > > > > > > > > > > > in
> > > > > > > > > > > > > the future but that's probably out of scope for
> this
> > > > > > discussion.
> > > > > > > > > I'm
> > > > > > > > > > > not
> > > > > > > > > > > > > sure I followed why the unresolved writes to the
> > config
> > > > > topic
> > > > > > > > > would be
> > > > > > > > > > > an
> > > > > > > > > > > > > issue - wouldn't the delete offsets request be
> added
> > to
> > > > the
> > > > > > > > > herder's
> > > > > > > > > > > > > request queue and whenever it is processed, we'd
> > anyway
> > > > > need
> > > > > > to
> > > > > > > > > check
> > > > > > > > > > > if
> > > > > > > > > > > > > all the prerequisites for the request are
> satisfied?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 5. Thanks for elaborating that just fencing out the
> > > > > producer
> > > > > > > > still
> > > > > > > > > > > leaves
> > > > > > > > > > > > > many cases where source tasks remain hanging around
> > and
> > > > > also
> > > > > > that
> > > > > > > > > we
> > > > > > > > > > > > anyway
> > > > > > > > > > > > > can't have similar data production guarantees for
> > sink
> > > > > > connectors
> > > > > > > > > right
> > > > > > > > > > > > > now. I agree that it might be better to go with
> ease
> > of
> > > > > > > > > implementation
> > > > > > > > > > > > and
> > > > > > > > > > > > > consistency for now.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 6. Right, that does make sense but I still feel
> like
> > > the
> > > > > two
> > > > > > > > states
> > > > > > > > > > > will
> > > > > > > > > > > > > end up being confusing to end users who might not
> be
> > > able
> > > > > to
> > > > > > > > > discern
> > > > > > > > > > > the
> > > > > > > > > > > > > (fairly low-level) differences between them (also
> the
> > > > > > nuances of
> > > > > > > > > state
> > > > > > > > > > > > > transitions like STOPPED -> PAUSED or PAUSED ->
> > STOPPED
> > > > > with
> > > > > > the
> > > > > > > > > > > > > rebalancing implications as well). We can probably
> > > > revisit
> > > > > > this
> > > > > > > > > > > potential
> > > > > > > > > > > > > deprecation in the future based on user feedback
> and
> > > how
> > > > > the
> > > > > > > > > adoption
> > > > > > > > > > > of
> > > > > > > > > > > > > the new proposed stop endpoint looks like, what do
> > you
> > > > > think?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 7. Aha, that is completely my bad, I missed that
> the
> > > > v1/v2
> > > > > > state
> > > > > > > > is
> > > > > > > > > > > only
> > > > > > > > > > > > > applicable to the connector's target state and that
> > we
> > > > > don't
> > > > > > need
> > > > > > > > > to
> > > > > > > > > > > > worry
> > > > > > > > > > > > > about the tasks since we will have an empty set of
> > > > tasks. I
> > > > > > > > think I
> > > > > > > > > > > was a
> > > > > > > > > > > > > little confused by "pause the parts of the
> connector
> > > that
> > > > > > they
> > > > > > > > are
> > > > > > > > > > > > > assigned" from the KIP. Thanks for clarifying that!
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Some more thoughts and questions that I had -
> > > > > > > > > > > > >
> > > > > > > > > > > > > 8. Could you elaborate on what the implementation
> for
> > > > > offset
> > > > > > > > reset
> > > > > > > > > for
> > > > > > > > > > > > > source connectors would look like? Currently, it
> > > doesn't
> > > > > look
> > > > > > > > like
> > > > > > > > > we
> > > > > > > > > > > > track
> > > > > > > > > > > > > all the partitions for a source connector anywhere.
> > > Will
> > > > we
> > > > > > need
> > > > > > > > to
> > > > > > > > > > > > > book-keep this somewhere in order to be able to
> emit
> > a
> > > > > > tombstone
> > > > > > > > > record
> > > > > > > > > > > > for
> > > > > > > > > > > > > each source partition?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 9. The KIP describes the offset reset endpoint as
> > only
> > > > > being
> > > > > > > > > usable on
> > > > > > > > > > > > > existing connectors that are in a `STOPPED` state.
> > Why
> > > > > > wouldn't
> > > > > > > > we
> > > > > > > > > want
> > > > > > > > > > > > to
> > > > > > > > > > > > > allow resetting offsets for a deleted connector
> which
> > > > seems
> > > > > > to
> > > > > > > > be a
> > > > > > > > > > > valid
> > > > > > > > > > > > > use case? Or do we plan to handle this use case
> only
> > > via
> > > > > the
> > > > > > item
> > > > > > > > > > > > outlined
> > > > > > > > > > > > > in the future work section - "Automatically delete
> > > > offsets
> > > > > > with
> > > > > > > > > > > > > connectors"?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 10. The KIP mentions that source offsets will be
> > reset
> > > > > > > > > transactionally
> > > > > > > > > > > > for
> > > > > > > > > > > > > each topic (worker global offset topic and
> connector
> > > > > specific
> > > > > > > > > offset
> > > > > > > > > > > > topic
> > > > > > > > > > > > > if it exists). While it obviously isn't possible to
> > > > > > atomically do
> > > > > > > > > the
> > > > > > > > > > > > > writes to two topics which may be on different
> Kafka
> > > > > > clusters,
> > > > > > > > I'm
> > > > > > > > > > > > > wondering about what would happen if the first
> > > > transaction
> > > > > > > > > succeeds but
> > > > > > > > > > > > the
> > > > > > > > > > > > > second one fails. I think the order of the two
> > > > transactions
> > > > > > > > matters
> > > > > > > > > > > here
> > > > > > > > > > > > -
> > > > > > > > > > > > > if we successfully emit tombstones to the connector
> > > > > specific
> > > > > > > > offset
> > > > > > > > > > > topic
> > > > > > > > > > > > > and fail to do so for the worker global offset
> topic,
> > > > we'll
> > > > > > > > > presumably
> > > > > > > > > > > > fail
> > > > > > > > > > > > > the offset delete request because the KIP mentions
> > that
> > > > "A
> > > > > > > > request
> > > > > > > > > to
> > > > > > > > > > > > reset
> > > > > > > > > > > > > offsets for a source connector will only be
> > considered
> > > > > > successful
> > > > > > > > > if
> > > > > > > > > > > the
> > > > > > > > > > > > > worker is able to delete all known offsets for that
> > > > > > connector, on
> > > > > > > > > both
> > > > > > > > > > > > the
> > > > > > > > > > > > > worker's global offsets topic and (if one is used)
> > the
> > > > > > > > connector's
> > > > > > > > > > > > > dedicated offsets topic.". However, this will lead
> to
> > > the
> > > > > > > > connector
> > > > > > > > > > > only
> > > > > > > > > > > > > being able to read potentially older offsets from
> the
> > > > > worker
> > > > > > > > global
> > > > > > > > > > > > offset
> > > > > > > > > > > > > topic on resumption (based on the combined offset
> > view
> > > > > > presented
> > > > > > > > as
> > > > > > > > > > > > > described in KIP-618 [1]). So, I think we should
> make
> > > > sure
> > > > > > that
> > > > > > > > the
> > > > > > > > > > > > worker
> > > > > > > > > > > > > global offset topic tombstoning is attempted first,
> > > > right?
> > > > > > Note
> > > > > > > > > that in
> > > > > > > > > > > > the
> > > > > > > > > > > > > current implementation of
> > > > > > `ConnectorOffsetBackingStore::set`, the
> > > > > > > > > > > > primary /
> > > > > > > > > > > > > connector specific offset store is written to
> first.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 11. This probably isn't necessary to elaborate on
> in
> > > the
> > > > > KIP
> > > > > > > > > itself,
> > > > > > > > > > > but
> > > > > > > > > > > > I
> > > > > > > > > > > > > was wondering what the second offset test - "verify
> > > that
> > > > > that
> > > > > > > > those
> > > > > > > > > > > > offsets
> > > > > > > > > > > > > reflect an expected level of progress for each
> > > connector
> > > > > > (i.e.,
> > > > > > > > > they
> > > > > > > > > > > are
> > > > > > > > > > > > > greater than or equal to a certain value depending
> on
> > > how
> > > > > the
> > > > > > > > > > > connectors
> > > > > > > > > > > > > are configured and how long they have been
> running)"
> > -
> > > > > would
> > > > > > look
> > > > > > > > > like?
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Yash
> > > > > > > > > > > > >
> > > > > > > > > > > > > [1] -
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton
> > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for your detailed thoughts.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 1. In KAFKA-4107 [1], the primary request is
> > exactly
> > > > > what's
> > > > > > > > > proposed
> > > > > > > > > > > in
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > KIP right now: a way to reset offsets for
> > connectors.
> > > > > Sure,
> > > > > > > > > there's
> > > > > > > > > > > an
> > > > > > > > > > > > > > extra step of stopping the connector, but
> renaming
> > a
> > > > > > connector
> > > > > > > > > isn't
> > > > > > > > > > > as
> > > > > > > > > > > > > > convenient of an alternative as it may seem since
> > in
> > > > many
> > > > > > cases
> > > > > > > > > you'd
> > > > > > > > > > > > > also
> > > > > > > > > > > > > > want to delete the older one, so the complete
> > > sequence
> > > > of
> > > > > > steps
> > > > > > > > > would
> > > > > > > > > > > > be
> > > > > > > > > > > > > > something like delete the old connector, rename
> it
> > > > > > (possibly
> > > > > > > > > > > requiring
> > > > > > > > > > > > > > modifications to its config file, depending on
> > which
> > > > API
> > > > > is
> > > > > > > > > used),
> > > > > > > > > > > then
> > > > > > > > > > > > > > create the renamed variant. It's also just not a
> > > great
> > > > > user
> > > > > > > > > > > > > > experience--even if the practical impacts are
> > limited
> > > > > > (which,
> > > > > > > > > IMO,
> > > > > > > > > > > they
> > > > > > > > > > > > > are
> > > > > > > > > > > > > > not), people have been asking for years about why
> > > they
> > > > > > have to
> > > > > > > > > employ
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > kind of a workaround for a fairly common use
> case,
> > > and
> > > > we
> > > > > > don't
> > > > > > > > > > > really
> > > > > > > > > > > > > have
> > > > > > > > > > > > > > a good answer beyond "we haven't implemented
> > > something
> > > > > > better
> > > > > > > > > yet".
> > > > > > > > > > > On
> > > > > > > > > > > > > top
> > > > > > > > > > > > > > of that, you may have external tooling that needs
> > to
> > > be
> > > > > > tweaked
> > > > > > > > > to
> > > > > > > > > > > > > handle a
> > > > > > > > > > > > > > new connector name, you may have strict
> > authorization
> > > > > > policies
> > > > > > > > > around
> > > > > > > > > > > > who
> > > > > > > > > > > > > > can access what connectors, you may have other
> ACLs
> > > > > > attached to
> > > > > > > > > the
> > > > > > > > > > > > name
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > the connector (which can be especially common in
> > the
> > > > case
> > > > > > of
> > > > > > > > sink
> > > > > > > > > > > > > > connectors, whose consumer group IDs are tied to
> > > their
> > > > > > names by
> > > > > > > > > > > > default),
> > > > > > > > > > > > > > and leaving around state in the offsets topic
> that
> > > can
> > > > > > never be
> > > > > > > > > > > cleaned
> > > > > > > > > > > > > up
> > > > > > > > > > > > > > presents a bit of a footgun for users. It may not
> > be
> > > a
> > > > > > silver
> > > > > > > > > bullet,
> > > > > > > > > > > > but
> > > > > > > > > > > > > > providing some mechanism to reset that state is a
> > > step
> > > > in
> > > > > > the
> > > > > > > > > right
> > > > > > > > > > > > > > direction and allows responsible users to more
> > > > carefully
> > > > > > > > > administer
> > > > > > > > > > > > their
> > > > > > > > > > > > > > cluster without resorting to non-public APIs.
> That
> > > > said,
> > > > > I
> > > > > > do
> > > > > > > > > agree
> > > > > > > > > > > > that
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > fine-grained reset/overwrite API would be useful,
> > and
> > > > I'd
> > > > > > be
> > > > > > > > > happy to
> > > > > > > > > > > > > > review a KIP to add that feature if anyone wants
> to
> > > > > tackle
> > > > > > it!
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 2. Keeping the two formats symmetrical is
> motivated
> > > > > mostly
> > > > > > by
> > > > > > > > > > > > aesthetics
> > > > > > > > > > > > > > and quality-of-life for programmatic interaction
> > with
> > > > the
> > > > > > API;
> > > > > > > > > it's
> > > > > > > > > > > not
> > > > > > > > > > > > > > really a goal to hide the use of consumer groups
> > from
> > > > > > users. I
> > > > > > > > do
> > > > > > > > > > > agree
> > > > > > > > > > > > > > that the format is a little strange-looking for
> > sink
> > > > > > > > connectors,
> > > > > > > > > but
> > > > > > > > > > > it
> > > > > > > > > > > > > > seemed like it would be easier to work with for
> > UIs,
> > > > > > casual jq
> > > > > > > > > > > queries,
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > CLIs than a more Kafka-specific alternative such
> as
> > > > > > > > > > > > > {"<topic>-<partition>":
> > > > > > > > > > > > > > "<offset>"}, and although it is a little
> strange, I
> > > > don't
> > > > > > think
> > > > > > > > > it's
> > > > > > > > > > > > any
> > > > > > > > > > > > > > less readable or intuitive. That said, I've made
> > some
> > > > > > tweaks to
> > > > > > > > > the
> > > > > > > > > > > > > format
> > > > > > > > > > > > > > that should make programmatic access even easier;
> > > > > > specifically,
> > > > > > > > > I've
> > > > > > > > > > > > > > removed the "source" and "sink" wrapper fields
> and
> > > > > instead
> > > > > > > > moved
> > > > > > > > > them
> > > > > > > > > > > > > into
> > > > > > > > > > > > > > a top-level object with a "type" and "offsets"
> > field,
> > > > > just
> > > > > > like
> > > > > > > > > you
> > > > > > > > > > > > > > suggested in point 3 (thanks!). We might also
> > > consider
> > > > > > changing
> > > > > > > > > the
> > > > > > > > > > > > field
> > > > > > > > > > > > > > names for sink offsets from "topic", "partition",
> > and
> > > > > > "offset"
> > > > > > > > to
> > > > > > > > > > > > "Kafka
> > > > > > > > > > > > > > topic", "Kafka partition", and "Kafka offset"
> > > > > > respectively, to
> > > > > > > > > reduce
> > > > > > > > > > > > the
> > > > > > > > > > > > > > stuttering effect of having a "partition" field
> > > inside
> > > > a
> > > > > > > > > "partition"
> > > > > > > > > > > > > field
> > > > > > > > > > > > > > and the same with an "offset" field; thoughts?
> One
> > > > final
> > > > > > > > > point--by
> > > > > > > > > > > > > equating
> > > > > > > > > > > > > > source and sink offsets, we probably make it
> easier
> > > for
> > > > > > users
> > > > > > > > to
> > > > > > > > > > > > > understand
> > > > > > > > > > > > > > exactly what a source offset is; anyone who's
> > > familiar
> > > > > with
> > > > > > > > > consumer
> > > > > > > > > > > > > > offsets can see from the response format that we
> > > > > identify a
> > > > > > > > > logical
> > > > > > > > > > > > > > partition as a combination of two entities (a
> topic
> > > > and a
> > > > > > > > > partition
> > > > > > > > > > > > > > number); it should make it easier to grok what a
> > > source
> > > > > > offset
> > > > > > > > > is by
> > > > > > > > > > > > > seeing
> > > > > > > > > > > > > > what the two formats have in common.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 3. Great idea! Done.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 4. Yes, I'm thinking right now that a 409 will be
> > the
> > > > > > response
> > > > > > > > > status
> > > > > > > > > > > > if
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > rebalance is pending. I'd rather not add this to
> > the
> > > > KIP
> > > > > > as we
> > > > > > > > > may
> > > > > > > > > > > want
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > change it at some point and it doesn't seem vital
> > to
> > > > > > establish
> > > > > > > > > it as
> > > > > > > > > > > > part
> > > > > > > > > > > > > > of the public contract for the new endpoint right
> > > now.
> > > > > > Also,
> > > > > > > > > small
> > > > > > > > > > > > > > point--yes, a 409 is useful to avoid forwarding
> > > > requests
> > > > > > to an
> > > > > > > > > > > > incorrect
> > > > > > > > > > > > > > leader, but it's also useful to ensure that there
> > > > aren't
> > > > > > any
> > > > > > > > > > > unresolved
> > > > > > > > > > > > > > writes to the config topic that might cause
> issues
> > > with
> > > > > the
> > > > > > > > > request
> > > > > > > > > > > > (such
> > > > > > > > > > > > > > as deleting the connector).
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 5. That's a good point--it may be misleading to
> > call
> > > a
> > > > > > > > connector
> > > > > > > > > > > > STOPPED
> > > > > > > > > > > > > > when it has zombie tasks lying around on the
> > > cluster. I
> > > > > > don't
> > > > > > > > > think
> > > > > > > > > > > > it'd
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > appropriate to do this synchronously while
> handling
> > > > > > requests to
> > > > > > > > > the
> > > > > > > > > > > PUT
> > > > > > > > > > > > > > /connectors/{connector}/stop since we'd want to
> > give
> > > > all
> > > > > > > > > > > > > currently-running
> > > > > > > > > > > > > > tasks a chance to gracefully shut down, though.
> I'm
> > > > also
> > > > > > not
> > > > > > > > sure
> > > > > > > > > > > that
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > is a significant problem, either. If the
> connector
> > is
> > > > > > resumed,
> > > > > > > > > then
> > > > > > > > > > > all
> > > > > > > > > > > > > > zombie tasks will be automatically fenced out by
> > > their
> > > > > > > > > successors on
> > > > > > > > > > > > > > startup; if it's deleted, then we'll have wasted
> > > effort
> > > > > by
> > > > > > > > > performing
> > > > > > > > > > > > an
> > > > > > > > > > > > > > unnecessary round of fencing. It may be nice to
> > > > guarantee
> > > > > > that
> > > > > > > > > source
> > > > > > > > > > > > > task
> > > > > > > > > > > > > > resources will be deallocated after the connector
> > > > > > transitions
> > > > > > > > to
> > > > > > > > > > > > STOPPED,
> > > > > > > > > > > > > > but realistically, it doesn't do much to just
> fence
> > > out
> > > > > > their
> > > > > > > > > > > > producers,
> > > > > > > > > > > > > > since tasks can be blocked on a number of other
> > > > > operations
> > > > > > such
> > > > > > > > > as
> > > > > > > > > > > > > > key/value/header conversion, transformation, and
> > task
> > > > > > polling.
> > > > > > > > > It may
> > > > > > > > > > > > be
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > little strange if data is produced to Kafka after
> > the
> > > > > > connector
> > > > > > > > > has
> > > > > > > > > > > > > > transitioned to STOPPED, but we can't provide the
> > > same
> > > > > > > > > guarantees for
> > > > > > > > > > > > > sink
> > > > > > > > > > > > > > connectors, since their tasks may be stuck on a
> > > > > > long-running
> > > > > > > > > > > > > SinkTask::put
> > > > > > > > > > > > > > that emits data even after the Connect framework
> > has
> > > > > > abandoned
> > > > > > > > > them
> > > > > > > > > > > > after
> > > > > > > > > > > > > > exhausting their graceful shutdown timeout.
> > > Ultimately,
> > > > > I'd
> > > > > > > > > prefer to
> > > > > > > > > > > > err
> > > > > > > > > > > > > > on the side of consistency and ease of
> > implementation
> > > > for
> > > > > > now,
> > > > > > > > > but I
> > > > > > > > > > > > may
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > missing a case where a few extra records from a
> > task
> > > > > that's
> > > > > > > > slow
> > > > > > > > > to
> > > > > > > > > > > > shut
> > > > > > > > > > > > > > down may cause serious issues--let me know.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 6. I'm hesitant to propose deprecation of the
> > PAUSED
> > > > > state
> > > > > > > > right
> > > > > > > > > now
> > > > > > > > > > > as
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > does serve a few purposes. Leaving tasks
> > > > idling-but-ready
> > > > > > makes
> > > > > > > > > > > > resuming
> > > > > > > > > > > > > > them less disruptive across the cluster, since a
> > > > > rebalance
> > > > > > > > isn't
> > > > > > > > > > > > > necessary.
> > > > > > > > > > > > > > It also reduces latency to resume the connector,
> > > > > > especially for
> > > > > > > > > ones
> > > > > > > > > > > > that
> > > > > > > > > > > > > > have to do a lot of state gathering on
> > initialization
> > > > to,
> > > > > > e.g.,
> > > > > > > > > read
> > > > > > > > > > > > > > offsets from an external system.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 7. There should be no risk of mixed tasks after a
> > > > > > downgrade,
> > > > > > > > > thanks
> > > > > > > > > > > to
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > empty set of task configs that gets published to
> > the
> > > > > config
> > > > > > > > > topic.
> > > > > > > > > > > Both
> > > > > > > > > > > > > > upgraded and downgraded workers will render an
> > empty
> > > > set
> > > > > of
> > > > > > > > > tasks for
> > > > > > > > > > > > the
> > > > > > > > > > > > > > connector, and keep that set of empty tasks until
> > the
> > > > > > connector
> > > > > > > > > is
> > > > > > > > > > > > > resumed.
> > > > > > > > > > > > > > Does that address your concerns?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > You're also correct that the linked Jira ticket
> was
> > > > > wrong;
> > > > > > > > > thanks for
> > > > > > > > > > > > > > pointing that out! Yes, KAFKA-4107 is the
> intended
> > > > > ticket,
> > > > > > and
> > > > > > > > > I've
> > > > > > > > > > > > > updated
> > > > > > > > > > > > > > the link in the KIP accordingly.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > [1] -
> > > https://issues.apache.org/jira/browse/KAFKA-4107
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya <
> > > > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks a lot for this KIP, I think something
> like
> > > > this
> > > > > > has
> > > > > > > > been
> > > > > > > > > > > long
> > > > > > > > > > > > > > > overdue for Kafka Connect :)
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Some thoughts and questions that I had -
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1. I'm wondering if you could elaborate a
> little
> > > more
> > > > > on
> > > > > > the
> > > > > > > > > use
> > > > > > > > > > > case
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > the `DELETE /connectors/{connector}/offsets`
> > API. I
> > > > > > think we
> > > > > > > > > can
> > > > > > > > > > > all
> > > > > > > > > > > > > > agree
> > > > > > > > > > > > > > > that a fine grained reset API that allows
> setting
> > > > > > arbitrary
> > > > > > > > > offsets
> > > > > > > > > > > > for
> > > > > > > > > > > > > > > partitions would be quite useful (which you
> talk
> > > > about
> > > > > > in the
> > > > > > > > > > > Future
> > > > > > > > > > > > > work
> > > > > > > > > > > > > > > section). But for the `DELETE
> > > > > > > > /connectors/{connector}/offsets`
> > > > > > > > > API
> > > > > > > > > > > in
> > > > > > > > > > > > > its
> > > > > > > > > > > > > > > described form, it looks like it would only
> > serve a
> > > > > > seemingly
> > > > > > > > > niche
> > > > > > > > > > > > use
> > > > > > > > > > > > > > > case where users want to avoid renaming
> > connectors
> > > -
> > > > > > because
> > > > > > > > > this
> > > > > > > > > > > new
> > > > > > > > > > > > > way
> > > > > > > > > > > > > > > of resetting offsets actually has more steps
> > (i.e.
> > > > stop
> > > > > > the
> > > > > > > > > > > > connector,
> > > > > > > > > > > > > > > reset offsets via the API, resume the
> connector)
> > > than
> > > > > > simply
> > > > > > > > > > > deleting
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > re-creating the connector with a different
> name?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 2. The KIP talks about taking care that the
> > > response
> > > > > > formats
> > > > > > > > > > > > > (presumably
> > > > > > > > > > > > > > > only talking about the new GET API here) are
> > > > > symmetrical
> > > > > > for
> > > > > > > > > both
> > > > > > > > > > > > > source
> > > > > > > > > > > > > > > and sink connectors - is the end goal to have
> > users
> > > > of
> > > > > > Kafka
> > > > > > > > > > > Connect
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > > even be aware that sink connectors use Kafka
> > > > consumers
> > > > > > under
> > > > > > > > > the
> > > > > > > > > > > hood
> > > > > > > > > > > > > > (i.e.
> > > > > > > > > > > > > > > have that as purely an implementation detail
> > > > abstracted
> > > > > > away
> > > > > > > > > from
> > > > > > > > > > > > > users)?
> > > > > > > > > > > > > > > While I understand the value of uniformity
> here,
> > > the
> > > > > > response
> > > > > > > > > > > format
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > sink connectors currently looks a little odd
> with
> > > the
> > > > > > > > > "partition"
> > > > > > > > > > > > field
> > > > > > > > > > > > > > > having "topic" and "partition" as sub-fields,
> > > > > especially
> > > > > > to
> > > > > > > > > users
> > > > > > > > > > > > > > familiar
> > > > > > > > > > > > > > > with Kafka semantics. Thoughts?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 3. Another little nitpick on the response
> format
> > -
> > > > why
> > > > > > do we
> > > > > > > > > need
> > > > > > > > > > > > > > "source"
> > > > > > > > > > > > > > > / "sink" as a field under "offsets"? Users can
> > > query
> > > > > the
> > > > > > > > > connector
> > > > > > > > > > > > type
> > > > > > > > > > > > > > via
> > > > > > > > > > > > > > > the existing `GET /connectors` API. If it's
> > deemed
> > > > > > important
> > > > > > > > > to let
> > > > > > > > > > > > > users
> > > > > > > > > > > > > > > know that the offsets they're seeing correspond
> > to
> > > a
> > > > > > source /
> > > > > > > > > sink
> > > > > > > > > > > > > > > connector, maybe we could have a top level
> field
> > > > "type"
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > > response
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > the `GET /connectors/{connector}/offsets` API
> > > similar
> > > > > to
> > > > > > the
> > > > > > > > > `GET
> > > > > > > > > > > > > > > /connectors` API?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 4. For the `DELETE
> > /connectors/{connector}/offsets`
> > > > > API,
> > > > > > the
> > > > > > > > > KIP
> > > > > > > > > > > > > mentions
> > > > > > > > > > > > > > > that requests will be rejected if a rebalance
> is
> > > > > pending
> > > > > > -
> > > > > > > > > > > presumably
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > is to avoid forwarding requests to a leader
> which
> > > may
> > > > > no
> > > > > > > > > longer be
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > leader after the pending rebalance? In this
> case,
> > > the
> > > > > API
> > > > > > > > will
> > > > > > > > > > > > return a
> > > > > > > > > > > > > > > `409 Conflict` response similar to some of the
> > > > existing
> > > > > > APIs,
> > > > > > > > > > > right?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 5. Regarding fencing out previously running
> tasks
> > > > for a
> > > > > > > > > connector,
> > > > > > > > > > > do
> > > > > > > > > > > > > you
> > > > > > > > > > > > > > > think it would make more sense semantically to
> > have
> > > > > this
> > > > > > > > > > > implemented
> > > > > > > > > > > > in
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > stop endpoint where an empty set of tasks is
> > > > generated,
> > > > > > > > rather
> > > > > > > > > than
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > delete offsets endpoint? This would also give
> the
> > > new
> > > > > > > > `STOPPED`
> > > > > > > > > > > > state a
> > > > > > > > > > > > > > > higher confidence of sorts, with any zombie
> tasks
> > > > being
> > > > > > > > fenced
> > > > > > > > > off
> > > > > > > > > > > > from
> > > > > > > > > > > > > > > continuing to produce data.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 6. Thanks for outlining the issues with the
> > current
> > > > > > state of
> > > > > > > > > the
> > > > > > > > > > > > > `PAUSED`
> > > > > > > > > > > > > > > state - I think a lot of users expect it to
> > behave
> > > > like
> > > > > > the
> > > > > > > > > > > `STOPPED`
> > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > you outline in the KIP and are (unpleasantly)
> > > > surprised
> > > > > > when
> > > > > > > > it
> > > > > > > > > > > > > doesn't.
> > > > > > > > > > > > > > > However, this does beg the question of what the
> > > > > > usefulness of
> > > > > > > > > > > having
> > > > > > > > > > > > > two
> > > > > > > > > > > > > > > separate `PAUSED` and `STOPPED` states is? Do
> we
> > > want
> > > > > to
> > > > > > > > > continue
> > > > > > > > > > > > > > > supporting both these states in the future, or
> do
> > > you
> > > > > > see the
> > > > > > > > > > > > `STOPPED`
> > > > > > > > > > > > > > > state eventually causing the existing `PAUSED`
> > > state
> > > > to
> > > > > > be
> > > > > > > > > > > > deprecated?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 7. I think the idea outlined in the KIP for
> > > handling
> > > > a
> > > > > > new
> > > > > > > > > state
> > > > > > > > > > > > during
> > > > > > > > > > > > > > > cluster downgrades / rolling upgrades is quite
> > > > clever,
> > > > > > but do
> > > > > > > > > you
> > > > > > > > > > > > think
> > > > > > > > > > > > > > > there could be any issues with having a mix of
> > > > "paused"
> > > > > > and
> > > > > > > > > > > "stopped"
> > > > > > > > > > > > > > tasks
> > > > > > > > > > > > > > > for the same connector across workers in a
> > cluster?
> > > > At
> > > > > > the
> > > > > > > > very
> > > > > > > > > > > > least,
> > > > > > > > > > > > > I
> > > > > > > > > > > > > > > think it would be fairly confusing to most
> users.
> > > I'm
> > > > > > > > > wondering if
> > > > > > > > > > > > this
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > be avoided by stating clearly in the KIP that
> the
> > > new
> > > > > > `PUT
> > > > > > > > > > > > > > > /connectors/{connector}/stop`
> > > > > > > > > > > > > > > can only be used on a cluster that is fully
> > > upgraded
> > > > to
> > > > > > an AK
> > > > > > > > > > > version
> > > > > > > > > > > > > > newer
> > > > > > > > > > > > > > > than the one which ends up containing changes
> > from
> > > > this
> > > > > > KIP
> > > > > > > > and
> > > > > > > > > > > that
> > > > > > > > > > > > > if a
> > > > > > > > > > > > > > > cluster needs to be downgraded to an older
> > version,
> > > > the
> > > > > > user
> > > > > > > > > should
> > > > > > > > > > > > > > ensure
> > > > > > > > > > > > > > > that none of the connectors on the cluster are
> > in a
> > > > > > stopped
> > > > > > > > > state?
> > > > > > > > > > > > With
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > existing implementation, it looks like an
> > > > > unknown/invalid
> > > > > > > > > target
> > > > > > > > > > > > state
> > > > > > > > > > > > > > > record is basically just discarded (with an
> error
> > > > > message
> > > > > > > > > logged),
> > > > > > > > > > > so
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > > doesn't seem to be a disastrous failure
> scenario
> > > that
> > > > > can
> > > > > > > > bring
> > > > > > > > > > > down
> > > > > > > > > > > > a
> > > > > > > > > > > > > > > worker.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton
> > > > > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Ashwin,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks for your thoughts. Regarding your
> > > questions:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 1. The response would show the offsets that
> are
> > > > > > visible to
> > > > > > > > > the
> > > > > > > > > > > > source
> > > > > > > > > > > > > > > > connector, so it would combine the contents
> of
> > > the
> > > > > two
> > > > > > > > > topics,
> > > > > > > > > > > > giving
> > > > > > > > > > > > > > > > priority to offsets present in the
> > > > connector-specific
> > > > > > > > topic.
> > > > > > > > > I'm
> > > > > > > > > > > > > > > imagining
> > > > > > > > > > > > > > > > a follow-up question that some people may
> have
> > in
> > > > > > response
> > > > > > > > to
> > > > > > > > > > > that
> > > > > > > > > > > > is
> > > > > > > > > > > > > > > > whether we'd want to provide insight into the
> > > > > contents
> > > > > > of a
> > > > > > > > > > > single
> > > > > > > > > > > > > > topic
> > > > > > > > > > > > > > > at
> > > > > > > > > > > > > > > > a time. It may be useful to be able to see
> this
> > > > > > information
> > > > > > > > > in
> > > > > > > > > > > > order
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > debug connector issues or verify that it's
> safe
> > > to
> > > > > stop
> > > > > > > > > using a
> > > > > > > > > > > > > > > > connector-specific offsets topic (either
> > > > explicitly,
> > > > > or
> > > > > > > > > > > implicitly
> > > > > > > > > > > > > via
> > > > > > > > > > > > > > > > cluster downgrade). What do you think about
> > > adding
> > > > a
> > > > > > URL
> > > > > > > > > query
> > > > > > > > > > > > > > parameter
> > > > > > > > > > > > > > > > that allows users to dictate which view of
> the
> > > > > > connector's
> > > > > > > > > > > offsets
> > > > > > > > > > > > > they
> > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > given in the REST response, with options for
> > the
> > > > > > worker's
> > > > > > > > > global
> > > > > > > > > > > > > topic,
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > connector-specific topic, and the combined
> view
> > > of
> > > > > them
> > > > > > > > that
> > > > > > > > > the
> > > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > > and its tasks see (which would be the
> default)?
> > > > This
> > > > > > may be
> > > > > > > > > too
> > > > > > > > > > > > much
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > V1
> > > > > > > > > > > > > > > > but it feels like it's at least worth
> > exploring a
> > > > > bit.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 2. There is no option for this at the moment.
> > > Reset
> > > > > > > > > semantics are
> > > > > > > > > > > > > > > extremely
> > > > > > > > > > > > > > > > coarse-grained; for source connectors, we
> > delete
> > > > all
> > > > > > source
> > > > > > > > > > > > offsets,
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > for sink connectors, we delete the entire
> > > consumer
> > > > > > group.
> > > > > > > > I'm
> > > > > > > > > > > > hoping
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > will be enough for V1 and that, if there's
> > > > sufficient
> > > > > > > > demand
> > > > > > > > > for
> > > > > > > > > > > > it,
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > introduce a richer API for resetting or even
> > > > > modifying
> > > > > > > > > connector
> > > > > > > > > > > > > > offsets
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > a follow-up KIP.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 3. Good eye :) I think it's fine to keep the
> > > > existing
> > > > > > > > > behavior
> > > > > > > > > > > for
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > PAUSED state with the Connector instance,
> since
> > > the
> > > > > > primary
> > > > > > > > > > > purpose
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > Connector is to generate task configs and
> > monitor
> > > > the
> > > > > > > > > external
> > > > > > > > > > > > system
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > changes. If there's no chance for tasks to be
> > > > running
> > > > > > > > > anyways, I
> > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > see
> > > > > > > > > > > > > > > > much value in allowing paused connectors to
> > > > generate
> > > > > > new
> > > > > > > > task
> > > > > > > > > > > > > configs,
> > > > > > > > > > > > > > > > especially since each time that happens a
> > > rebalance
> > > > > is
> > > > > > > > > triggered
> > > > > > > > > > > > and
> > > > > > > > > > > > > > > > there's a non-zero cost to that. What do you
> > > think?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin
> > > > > > > > > > > > <apan...@confluent.io.invalid
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks for KIP Chris - I think this is a
> > useful
> > > > > > feature.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Can you please elaborate on the following
> in
> > > the
> > > > > KIP
> > > > > > -
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 1. How would the response of GET
> > > > > > > > > > > /connectors/{connector}/offsets
> > > > > > > > > > > > > look
> > > > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > if the worker has both global and connector
> > > > > specific
> > > > > > > > > offsets
> > > > > > > > > > > > topic
> > > > > > > > > > > > > ?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 2. How can we pass the reset options like
> > > > shift-by
> > > > > ,
> > > > > > > > > > > to-date-time
> > > > > > > > > > > > > > etc.
> > > > > > > > > > > > > > > > > using a REST API like DELETE
> > > > > > > > > /connectors/{connector}/offsets ?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 3. Today PAUSE operation on a connector
> > invokes
> > > > its
> > > > > > stop
> > > > > > > > > > > method -
> > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > there be a change here to reduce confusion
> > with
> > > > the
> > > > > > new
> > > > > > > > > > > proposed
> > > > > > > > > > > > > > > STOPPED
> > > > > > > > > > > > > > > > > state ?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > Ashwin
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:22 AM Chris
> Egerton
> > > > > > > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I noticed a fairly large gap in the first
> > > > version
> > > > > > of
> > > > > > > > > this KIP
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > published last Friday, which has to do
> with
> > > > > > > > accommodating
> > > > > > > > > > > > > > connectors
> > > > > > > > > > > > > > > > > > that target different Kafka clusters than
> > the
> > > > one
> > > > > > that
> > > > > > > > > the
> > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > > > Connect
> > > > > > > > > > > > > > > > > > cluster uses for its internal topics and
> > > source
> > > > > > > > > connectors
> > > > > > > > > > > with
> > > > > > > > > > > > > > > > dedicated
> > > > > > > > > > > > > > > > > > offsets topics. I've since updated the
> KIP
> > to
> > > > > > address
> > > > > > > > > this
> > > > > > > > > > > gap,
> > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > has
> > > > > > > > > > > > > > > > > > substantially altered the design. Wanted
> to
> > > > give
> > > > > a
> > > > > > > > > heads-up
> > > > > > > > > > > to
> > > > > > > > > > > > > > anyone
> > > > > > > > > > > > > > > > > > that's already started reviewing.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris
> > Egerton
> > > <
> > > > > > > > > > > chr...@aiven.io>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I'd like to begin discussion on a KIP
> to
> > > add
> > > > > > offsets
> > > > > > > > > > > support
> > > > > > > > > > > > to
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > > > > > > Connect REST API:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Nov 4, 2022 at 10:27 PM Chris Egerton
> > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > >
> > > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks again for your thoughts! Responses to ongoing
> > > > > > discussions
> > > > > > > > > inline
> > > > > > > > > > > > (easier to track context than referencing comment
> > > numbers):
> > > > > > > > > > > >
> > > > > > > > > > > > > However, this then leads me to wonder if we can
> make
> > > that
> > > > > > > > explicit
> > > > > > > > > by
> > > > > > > > > > > > including "connect" or "connector" in the higher
> level
> > > > field
> > > > > > names?
> > > > > > > > > Or do
> > > > > > > > > > > > you think this isn't required given that we're
> talking
> > > > about
> > > > > a
> > > > > > > > > Connect
> > > > > > > > > > > > specific REST API in the first place?
> > > > > > > > > > > >
> > > > > > > > > > > > I think "partition" and "offset" are fine as field
> > names
> > > > but
> > > > > > I'm
> > > > > > > > not
> > > > > > > > > > > hugely
> > > > > > > > > > > > opposed to adding "connector " as a prefix to them;
> > would
> > > > be
> > > > > > > > > interested
> > > > > > > > > > > in
> > > > > > > > > > > > others' thoughts.
> > > > > > > > > > > >
> > > > > > > > > > > > > I'm not sure I followed why the unresolved writes
> to
> > > the
> > > > > > config
> > > > > > > > > topic
> > > > > > > > > > > > would be an issue - wouldn't the delete offsets
> request
> > > be
> > > > > > added to
> > > > > > > > > the
> > > > > > > > > > > > herder's request queue and whenever it is processed,
> > we'd
> > > > > > anyway
> > > > > > > > > need to
> > > > > > > > > > > > check if all the prerequisites for the request are
> > > > satisfied?
> > > > > > > > > > > >
> > > > > > > > > > > > Some requests are handled in multiple steps. For
> > example,
> > > > > > deleting
> > > > > > > > a
> > > > > > > > > > > > connector (1) adds a request to the herder queue to
> > > write a
> > > > > > > > > tombstone to
> > > > > > > > > > > > the config topic (or, if the worker isn't the leader,
> > > > forward
> > > > > > the
> > > > > > > > > request
> > > > > > > > > > > > to the leader). (2) Once that tombstone is picked up,
> > > (3) a
> > > > > > > > rebalance
> > > > > > > > > > > > ensues, and then after it's finally complete, (4) the
> > > > > > connector and
> > > > > > > > > its
> > > > > > > > > > > > tasks are shut down. I probably could have used
> better
> > > > > > terminology,
> > > > > > > > > but
> > > > > > > > > > > > what I meant by "unresolved writes to the config
> topic"
> > > > was a
> > > > > > case
> > > > > > > > in
> > > > > > > > > > > > between steps (2) and (3)--where the worker has
> already
> > > > read
> > > > > > that
> > > > > > > > > > > tombstone
> > > > > > > > > > > > from the config topic and knows that a rebalance is
> > > > pending,
> > > > > > but
> > > > > > > > > hasn't
> > > > > > > > > > > > begun participating in that rebalance yet. In the
> > > > > > DistributedHerder
> > > > > > > > > > > class,
> > > > > > > > > > > > this is done via the `checkRebalanceNeeded` method.
> > > > > > > > > > > >
> > > > > > > > > > > > > We can probably revisit this potential deprecation
> > [of
> > > > the
> > > > > > PAUSED
> > > > > > > > > > > state]
> > > > > > > > > > > > in the future based on user feedback and how the
> > adoption
> > > > of
> > > > > > the
> > > > > > > > new
> > > > > > > > > > > > proposed stop endpoint looks like, what do you think?
> > > > > > > > > > > >
> > > > > > > > > > > > Yeah, revisiting in the future seems reasonable. 👍
> > > > > > > > > > > >
> > > > > > > > > > > > And responses to new comments here:
> > > > > > > > > > > >
> > > > > > > > > > > > 8. Yep, we'll start tracking offsets by connector. I
> > > don't
> > > > > > believe
> > > > > > > > > this
> > > > > > > > > > > > should be too difficult, and suspect that the only
> > reason
> > > > we
> > > > > > track
> > > > > > > > > raw
> > > > > > > > > > > byte
> > > > > > > > > > > > arrays instead of pre-deserializing offset topic
> > > > information
> > > > > > into
> > > > > > > > > > > something
> > > > > > > > > > > > more useful is because Connect originally had
> pluggable
> > > > > > internal
> > > > > > > > > > > > converters. Now that we're hardcoded to use the JSON
> > > > > converter
> > > > > > it
> > > > > > > > > should
> > > > > > > > > > > be
> > > > > > > > > > > > fine to track offsets on a per-connector basis as
> > they're
> > > > > read
> > > > > > from
> > > > > > > > > the
> > > > > > > > > > > > offsets topic.
> > > > > > > > > > > >
> > > > > > > > > > > > 9. I'm hesitant to introduce this type of feature
> right
> > > now
> > > > > > because
> > > > > > > > > of
> > > > > > > > > > > all
> > > > > > > > > > > > of the gotchas that would come with it. In
> > > > security-conscious
> > > > > > > > > > > environments,
> > > > > > > > > > > > it's possible that a sink connector's principal may
> > have
> > > > > > access to
> > > > > > > > > the
> > > > > > > > > > > > consumer group used by the connector, but the
> worker's
> > > > > > principal
> > > > > > > > may
> > > > > > > > > not.
> > > > > > > > > > > > There's also the case where source connectors have
> > > separate
> > > > > > offsets
> > > > > > > > > > > topics,
> > > > > > > > > > > > or sink connectors have overridden consumer group
> IDs,
> > or
> > > > > sink
> > > > > > or
> > > > > > > > > source
> > > > > > > > > > > > connectors work against a different Kafka cluster
> than
> > > the
> > > > > one
> > > > > > that
> > > > > > > > > their
> > > > > > > > > > > > worker uses. Overall, I'd rather provide a single API
> > > that
> > > > > > works in
> > > > > > > > > all
> > > > > > > > > > > > cases rather than risk confusing and alienating users
> > by
> > > > > > trying to
> > > > > > > > > make
> > > > > > > > > > > > their lives easier in a subset of cases.
> > > > > > > > > > > >
> > > > > > > > > > > > 10. Hmm... I don't think the order of the writes
> > matters
> > > > too
> > > > > > much
> > > > > > > > > here,
> > > > > > > > > > > but
> > > > > > > > > > > > we probably could start by deleting from the global
> > topic
> > > > > > first,
> > > > > > > > > that's
> > > > > > > > > > > > true. The reason I'm not hugely concerned about this
> > case
> > > > is
> > > > > > that
> > > > > > > > if
> > > > > > > > > > > > something goes wrong while resetting offsets, there's
> > no
> > > > > > immediate
> > > > > > > > > > > > impact--the connector will still be in the STOPPED
> > state.
> > > > The
> > > > > > REST
> > > > > > > > > > > response
> > > > > > > > > > > > for requests to reset the offsets will clearly call
> out
> > > > that
> > > > > > the
> > > > > > > > > > > operation
> > > > > > > > > > > > has failed, and if necessary, we can probably also
> add
> > a
> > > > > > > > > scary-looking
> > > > > > > > > > > > warning message stating that we can't guarantee which
> > > > offsets
> > > > > > have
> > > > > > > > > been
> > > > > > > > > > > > successfully wiped and which haven't. Users can query
> > the
> > > > > exact
> > > > > > > > > offsets
> > > > > > > > > > > of
> > > > > > > > > > > > the connector at this point to determine what will
> > happen
> > > > > > if/what
> > > > > > > > > they
> > > > > > > > > > > > resume it. And they can repeat attempts to reset the
> > > > offsets
> > > > > as
> > > > > > > > many
> > > > > > > > > > > times
> > > > > > > > > > > > as they'd like until they get back a 2XX response,
> > > > indicating
> > > > > > that
> > > > > > > > > it's
> > > > > > > > > > > > finally safe to resume the connector. Thoughts?
> > > > > > > > > > > >
> > > > > > > > > > > > 11. I haven't thought too much about it. I think
> > > something
> > > > > > like the
> > > > > > > > > > > > Monitorable* connectors would probably serve our
> needs
> > > > here;
> > > > > > we can
> > > > > > > > > > > > instantiate them on a running Connect cluster and
> then
> > > use
> > > > > > various
> > > > > > > > > > > handles
> > > > > > > > > > > > to know how many times they've been polled, committed
> > > > > records,
> > > > > > etc.
> > > > > > > > > If
> > > > > > > > > > > > necessary we can tweak those classes or even write
> our
> > > own.
> > > > > But
> > > > > > > > > anyways,
> > > > > > > > > > > > once that's all done, the test will be something like
> > > > > "create a
> > > > > > > > > > > connector,
> > > > > > > > > > > > wait for it to produce N records (each of which
> > contains
> > > > some
> > > > > > kind
> > > > > > > > of
> > > > > > > > > > > > predictable offset), and ensure that the offsets for
> it
> > > in
> > > > > the
> > > > > > REST
> > > > > > > > > API
> > > > > > > > > > > > match up with the ones we'd expect from N records".
> > Does
> > > > that
> > > > > > > > answer
> > > > > > > > > your
> > > > > > > > > > > > question?
> > > > > > > > > > > >
> > > > > > > > > > > > Cheers,
> > > > > > > > > > > >
> > > > > > > > > > > > Chris
> > > > > > > > > > > >
> > > > > > > > > > > > On Tue, Oct 18, 2022 at 3:28 AM Yash Mayya <
> > > > > > yash.ma...@gmail.com>
> > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1. Thanks a lot for elaborating on this, I'm now
> > > > convinced
> > > > > > about
> > > > > > > > > the
> > > > > > > > > > > > > usefulness of the new offset reset endpoint.
> > Regarding
> > > > the
> > > > > > > > > follow-up
> > > > > > > > > > > KIP
> > > > > > > > > > > > > for a fine-grained offset write API, I'd be happy
> to
> > > take
> > > > > > that on
> > > > > > > > > once
> > > > > > > > > > > > this
> > > > > > > > > > > > > KIP is finalized and I will definitely look forward
> > to
> > > > your
> > > > > > > > > feedback on
> > > > > > > > > > > > > that one!
> > > > > > > > > > > > >
> > > > > > > > > > > > > 2. Gotcha, the motivation makes more sense to me
> now.
> > > So
> > > > > the
> > > > > > > > higher
> > > > > > > > > > > level
> > > > > > > > > > > > > partition field represents a Connect specific
> > "logical
> > > > > > partition"
> > > > > > > > > of
> > > > > > > > > > > > sorts
> > > > > > > > > > > > > - i.e. the source partition as defined by a
> connector
> > > for
> > > > > > source
> > > > > > > > > > > > connectors
> > > > > > > > > > > > > and a Kafka topic + partition for sink connectors.
> I
> > > like
> > > > > the
> > > > > > > > idea
> > > > > > > > > of
> > > > > > > > > > > > > adding a Kafka prefix to the lower level
> > > partition/offset
> > > > > > (and
> > > > > > > > > topic)
> > > > > > > > > > > > > fields which basically makes it more clear
> (although
> > > > > > implicitly)
> > > > > > > > > that
> > > > > > > > > > > the
> > > > > > > > > > > > > higher level partition/offset field is Connect
> > specific
> > > > and
> > > > > > not
> > > > > > > > the
> > > > > > > > > > > same
> > > > > > > > > > > > as
> > > > > > > > > > > > > what those terms represent in Kafka itself.
> However,
> > > this
> > > > > > then
> > > > > > > > > leads me
> > > > > > > > > > > > to
> > > > > > > > > > > > > wonder if we can make that explicit by including
> > > > "connect"
> > > > > or
> > > > > > > > > > > "connector"
> > > > > > > > > > > > > in the higher level field names? Or do you think
> this
> > > > isn't
> > > > > > > > > required
> > > > > > > > > > > > given
> > > > > > > > > > > > > that we're talking about a Connect specific REST
> API
> > in
> > > > the
> > > > > > first
> > > > > > > > > > > place?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 3. Thanks, I think the response structure
> definitely
> > > > looks
> > > > > > better
> > > > > > > > > now!
> > > > > > > > > > > > >
> > > > > > > > > > > > > 4. Interesting, I'd be curious to learn why we
> might
> > > want
> > > > > to
> > > > > > > > change
> > > > > > > > > > > this
> > > > > > > > > > > > in
> > > > > > > > > > > > > the future but that's probably out of scope for
> this
> > > > > > discussion.
> > > > > > > > > I'm
> > > > > > > > > > > not
> > > > > > > > > > > > > sure I followed why the unresolved writes to the
> > config
> > > > > topic
> > > > > > > > > would be
> > > > > > > > > > > an
> > > > > > > > > > > > > issue - wouldn't the delete offsets request be
> added
> > to
> > > > the
> > > > > > > > > herder's
> > > > > > > > > > > > > request queue and whenever it is processed, we'd
> > anyway
> > > > > need
> > > > > > to
> > > > > > > > > check
> > > > > > > > > > > if
> > > > > > > > > > > > > all the prerequisites for the request are
> satisfied?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 5. Thanks for elaborating that just fencing out the
> > > > > producer
> > > > > > > > still
> > > > > > > > > > > leaves
> > > > > > > > > > > > > many cases where source tasks remain hanging around
> > and
> > > > > also
> > > > > > that
> > > > > > > > > we
> > > > > > > > > > > > anyway
> > > > > > > > > > > > > can't have similar data production guarantees for
> > sink
> > > > > > connectors
> > > > > > > > > right
> > > > > > > > > > > > > now. I agree that it might be better to go with
> ease
> > of
> > > > > > > > > implementation
> > > > > > > > > > > > and
> > > > > > > > > > > > > consistency for now.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 6. Right, that does make sense but I still feel
> like
> > > the
> > > > > two
> > > > > > > > states
> > > > > > > > > > > will
> > > > > > > > > > > > > end up being confusing to end users who might not
> be
> > > able
> > > > > to
> > > > > > > > > discern
> > > > > > > > > > > the
> > > > > > > > > > > > > (fairly low-level) differences between them (also
> the
> > > > > > nuances of
> > > > > > > > > state
> > > > > > > > > > > > > transitions like STOPPED -> PAUSED or PAUSED ->
> > STOPPED
> > > > > with
> > > > > > the
> > > > > > > > > > > > > rebalancing implications as well). We can probably
> > > > revisit
> > > > > > this
> > > > > > > > > > > potential
> > > > > > > > > > > > > deprecation in the future based on user feedback
> and
> > > how
> > > > > the
> > > > > > > > > adoption
> > > > > > > > > > > of
> > > > > > > > > > > > > the new proposed stop endpoint looks like, what do
> > you
> > > > > think?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 7. Aha, that is completely my bad, I missed that
> the
> > > > v1/v2
> > > > > > state
> > > > > > > > is
> > > > > > > > > > > only
> > > > > > > > > > > > > applicable to the connector's target state and that
> > we
> > > > > don't
> > > > > > need
> > > > > > > > > to
> > > > > > > > > > > > worry
> > > > > > > > > > > > > about the tasks since we will have an empty set of
> > > > tasks. I
> > > > > > > > think I
> > > > > > > > > > > was a
> > > > > > > > > > > > > little confused by "pause the parts of the
> connector
> > > that
> > > > > > they
> > > > > > > > are
> > > > > > > > > > > > > assigned" from the KIP. Thanks for clarifying that!
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Some more thoughts and questions that I had -
> > > > > > > > > > > > >
> > > > > > > > > > > > > 8. Could you elaborate on what the implementation
> for
> > > > > offset
> > > > > > > > reset
> > > > > > > > > for
> > > > > > > > > > > > > source connectors would look like? Currently, it
> > > doesn't
> > > > > look
> > > > > > > > like
> > > > > > > > > we
> > > > > > > > > > > > track
> > > > > > > > > > > > > all the partitions for a source connector anywhere.
> > > Will
> > > > we
> > > > > > need
> > > > > > > > to
> > > > > > > > > > > > > book-keep this somewhere in order to be able to
> emit
> > a
> > > > > > tombstone
> > > > > > > > > record
> > > > > > > > > > > > for
> > > > > > > > > > > > > each source partition?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 9. The KIP describes the offset reset endpoint as
> > only
> > > > > being
> > > > > > > > > usable on
> > > > > > > > > > > > > existing connectors that are in a `STOPPED` state.
> > Why
> > > > > > wouldn't
> > > > > > > > we
> > > > > > > > > want
> > > > > > > > > > > > to
> > > > > > > > > > > > > allow resetting offsets for a deleted connector
> which
> > > > seems
> > > > > > to
> > > > > > > > be a
> > > > > > > > > > > valid
> > > > > > > > > > > > > use case? Or do we plan to handle this use case
> only
> > > via
> > > > > the
> > > > > > item
> > > > > > > > > > > > outlined
> > > > > > > > > > > > > in the future work section - "Automatically delete
> > > > offsets
> > > > > > with
> > > > > > > > > > > > > connectors"?
> > > > > > > > > > > > >
> > > > > > > > > > > > > 10. The KIP mentions that source offsets will be
> > reset
> > > > > > > > > transactionally
> > > > > > > > > > > > for
> > > > > > > > > > > > > each topic (worker global offset topic and
> connector
> > > > > specific
> > > > > > > > > offset
> > > > > > > > > > > > topic
> > > > > > > > > > > > > if it exists). While it obviously isn't possible to
> > > > > > atomically do
> > > > > > > > > the
> > > > > > > > > > > > > writes to two topics which may be on different
> Kafka
> > > > > > clusters,
> > > > > > > > I'm
> > > > > > > > > > > > > wondering about what would happen if the first
> > > > transaction
> > > > > > > > > succeeds but
> > > > > > > > > > > > the
> > > > > > > > > > > > > second one fails. I think the order of the two
> > > > transactions
> > > > > > > > matters
> > > > > > > > > > > here
> > > > > > > > > > > > -
> > > > > > > > > > > > > if we successfully emit tombstones to the connector
> > > > > specific
> > > > > > > > offset
> > > > > > > > > > > topic
> > > > > > > > > > > > > and fail to do so for the worker global offset
> topic,
> > > > we'll
> > > > > > > > > presumably
> > > > > > > > > > > > fail
> > > > > > > > > > > > > the offset delete request because the KIP mentions
> > that
> > > > "A
> > > > > > > > request
> > > > > > > > > to
> > > > > > > > > > > > reset
> > > > > > > > > > > > > offsets for a source connector will only be
> > considered
> > > > > > successful
> > > > > > > > > if
> > > > > > > > > > > the
> > > > > > > > > > > > > worker is able to delete all known offsets for that
> > > > > > connector, on
> > > > > > > > > both
> > > > > > > > > > > > the
> > > > > > > > > > > > > worker's global offsets topic and (if one is used)
> > the
> > > > > > > > connector's
> > > > > > > > > > > > > dedicated offsets topic.". However, this will lead
> to
> > > the
> > > > > > > > connector
> > > > > > > > > > > only
> > > > > > > > > > > > > being able to read potentially older offsets from
> the
> > > > > worker
> > > > > > > > global
> > > > > > > > > > > > offset
> > > > > > > > > > > > > topic on resumption (based on the combined offset
> > view
> > > > > > presented
> > > > > > > > as
> > > > > > > > > > > > > described in KIP-618 [1]). So, I think we should
> make
> > > > sure
> > > > > > that
> > > > > > > > the
> > > > > > > > > > > > worker
> > > > > > > > > > > > > global offset topic tombstoning is attempted first,
> > > > right?
> > > > > > Note
> > > > > > > > > that in
> > > > > > > > > > > > the
> > > > > > > > > > > > > current implementation of
> > > > > > `ConnectorOffsetBackingStore::set`, the
> > > > > > > > > > > > primary /
> > > > > > > > > > > > > connector specific offset store is written to
> first.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 11. This probably isn't necessary to elaborate on
> in
> > > the
> > > > > KIP
> > > > > > > > > itself,
> > > > > > > > > > > but
> > > > > > > > > > > > I
> > > > > > > > > > > > > was wondering what the second offset test - "verify
> > > that
> > > > > that
> > > > > > > > those
> > > > > > > > > > > > offsets
> > > > > > > > > > > > > reflect an expected level of progress for each
> > > connector
> > > > > > (i.e.,
> > > > > > > > > they
> > > > > > > > > > > are
> > > > > > > > > > > > > greater than or equal to a certain value depending
> on
> > > how
> > > > > the
> > > > > > > > > > > connectors
> > > > > > > > > > > > > are configured and how long they have been
> running)"
> > -
> > > > > would
> > > > > > look
> > > > > > > > > like?
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Yash
> > > > > > > > > > > > >
> > > > > > > > > > > > > [1] -
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=153817402#KIP618:ExactlyOnceSupportforSourceConnectors-Smoothmigration
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Tue, Oct 18, 2022 at 12:42 AM Chris Egerton
> > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > >
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Yash,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for your detailed thoughts.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 1. In KAFKA-4107 [1], the primary request is
> > exactly
> > > > > what's
> > > > > > > > > proposed
> > > > > > > > > > > in
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > KIP right now: a way to reset offsets for
> > connectors.
> > > > > Sure,
> > > > > > > > > there's
> > > > > > > > > > > an
> > > > > > > > > > > > > > extra step of stopping the connector, but
> renaming
> > a
> > > > > > connector
> > > > > > > > > isn't
> > > > > > > > > > > as
> > > > > > > > > > > > > > convenient of an alternative as it may seem since
> > in
> > > > many
> > > > > > cases
> > > > > > > > > you'd
> > > > > > > > > > > > > also
> > > > > > > > > > > > > > want to delete the older one, so the complete
> > > sequence
> > > > of
> > > > > > steps
> > > > > > > > > would
> > > > > > > > > > > > be
> > > > > > > > > > > > > > something like delete the old connector, rename
> it
> > > > > > (possibly
> > > > > > > > > > > requiring
> > > > > > > > > > > > > > modifications to its config file, depending on
> > which
> > > > API
> > > > > is
> > > > > > > > > used),
> > > > > > > > > > > then
> > > > > > > > > > > > > > create the renamed variant. It's also just not a
> > > great
> > > > > user
> > > > > > > > > > > > > > experience--even if the practical impacts are
> > limited
> > > > > > (which,
> > > > > > > > > IMO,
> > > > > > > > > > > they
> > > > > > > > > > > > > are
> > > > > > > > > > > > > > not), people have been asking for years about why
> > > they
> > > > > > have to
> > > > > > > > > employ
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > kind of a workaround for a fairly common use
> case,
> > > and
> > > > we
> > > > > > don't
> > > > > > > > > > > really
> > > > > > > > > > > > > have
> > > > > > > > > > > > > > a good answer beyond "we haven't implemented
> > > something
> > > > > > better
> > > > > > > > > yet".
> > > > > > > > > > > On
> > > > > > > > > > > > > top
> > > > > > > > > > > > > > of that, you may have external tooling that needs
> > to
> > > be
> > > > > > tweaked
> > > > > > > > > to
> > > > > > > > > > > > > handle a
> > > > > > > > > > > > > > new connector name, you may have strict
> > authorization
> > > > > > policies
> > > > > > > > > around
> > > > > > > > > > > > who
> > > > > > > > > > > > > > can access what connectors, you may have other
> ACLs
> > > > > > attached to
> > > > > > > > > the
> > > > > > > > > > > > name
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > the connector (which can be especially common in
> > the
> > > > case
> > > > > > of
> > > > > > > > sink
> > > > > > > > > > > > > > connectors, whose consumer group IDs are tied to
> > > their
> > > > > > names by
> > > > > > > > > > > > default),
> > > > > > > > > > > > > > and leaving around state in the offsets topic
> that
> > > can
> > > > > > never be
> > > > > > > > > > > cleaned
> > > > > > > > > > > > > up
> > > > > > > > > > > > > > presents a bit of a footgun for users. It may not
> > be
> > > a
> > > > > > silver
> > > > > > > > > bullet,
> > > > > > > > > > > > but
> > > > > > > > > > > > > > providing some mechanism to reset that state is a
> > > step
> > > > in
> > > > > > the
> > > > > > > > > right
> > > > > > > > > > > > > > direction and allows responsible users to more
> > > > carefully
> > > > > > > > > administer
> > > > > > > > > > > > their
> > > > > > > > > > > > > > cluster without resorting to non-public APIs.
> That
> > > > said,
> > > > > I
> > > > > > do
> > > > > > > > > agree
> > > > > > > > > > > > that
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > fine-grained reset/overwrite API would be useful,
> > and
> > > > I'd
> > > > > > be
> > > > > > > > > happy to
> > > > > > > > > > > > > > review a KIP to add that feature if anyone wants
> to
> > > > > tackle
> > > > > > it!
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 2. Keeping the two formats symmetrical is
> motivated
> > > > > mostly
> > > > > > by
> > > > > > > > > > > > aesthetics
> > > > > > > > > > > > > > and quality-of-life for programmatic interaction
> > with
> > > > the
> > > > > > API;
> > > > > > > > > it's
> > > > > > > > > > > not
> > > > > > > > > > > > > > really a goal to hide the use of consumer groups
> > from
> > > > > > users. I
> > > > > > > > do
> > > > > > > > > > > agree
> > > > > > > > > > > > > > that the format is a little strange-looking for
> > sink
> > > > > > > > connectors,
> > > > > > > > > but
> > > > > > > > > > > it
> > > > > > > > > > > > > > seemed like it would be easier to work with for
> > UIs,
> > > > > > casual jq
> > > > > > > > > > > queries,
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > CLIs than a more Kafka-specific alternative such
> as
> > > > > > > > > > > > > {"<topic>-<partition>":
> > > > > > > > > > > > > > "<offset>"}, and although it is a little
> strange, I
> > > > don't
> > > > > > think
> > > > > > > > > it's
> > > > > > > > > > > > any
> > > > > > > > > > > > > > less readable or intuitive. That said, I've made
> > some
> > > > > > tweaks to
> > > > > > > > > the
> > > > > > > > > > > > > format
> > > > > > > > > > > > > > that should make programmatic access even easier;
> > > > > > specifically,
> > > > > > > > > I've
> > > > > > > > > > > > > > removed the "source" and "sink" wrapper fields
> and
> > > > > instead
> > > > > > > > moved
> > > > > > > > > them
> > > > > > > > > > > > > into
> > > > > > > > > > > > > > a top-level object with a "type" and "offsets"
> > field,
> > > > > just
> > > > > > like
> > > > > > > > > you
> > > > > > > > > > > > > > suggested in point 3 (thanks!). We might also
> > > consider
> > > > > > changing
> > > > > > > > > the
> > > > > > > > > > > > field
> > > > > > > > > > > > > > names for sink offsets from "topic", "partition",
> > and
> > > > > > "offset"
> > > > > > > > to
> > > > > > > > > > > > "Kafka
> > > > > > > > > > > > > > topic", "Kafka partition", and "Kafka offset"
> > > > > > respectively, to
> > > > > > > > > reduce
> > > > > > > > > > > > the
> > > > > > > > > > > > > > stuttering effect of having a "partition" field
> > > inside
> > > > a
> > > > > > > > > "partition"
> > > > > > > > > > > > > field
> > > > > > > > > > > > > > and the same with an "offset" field; thoughts?
> One
> > > > final
> > > > > > > > > point--by
> > > > > > > > > > > > > equating
> > > > > > > > > > > > > > source and sink offsets, we probably make it
> easier
> > > for
> > > > > > users
> > > > > > > > to
> > > > > > > > > > > > > understand
> > > > > > > > > > > > > > exactly what a source offset is; anyone who's
> > > familiar
> > > > > with
> > > > > > > > > consumer
> > > > > > > > > > > > > > offsets can see from the response format that we
> > > > > identify a
> > > > > > > > > logical
> > > > > > > > > > > > > > partition as a combination of two entities (a
> topic
> > > > and a
> > > > > > > > > partition
> > > > > > > > > > > > > > number); it should make it easier to grok what a
> > > source
> > > > > > offset
> > > > > > > > > is by
> > > > > > > > > > > > > seeing
> > > > > > > > > > > > > > what the two formats have in common.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 3. Great idea! Done.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 4. Yes, I'm thinking right now that a 409 will be
> > the
> > > > > > response
> > > > > > > > > status
> > > > > > > > > > > > if
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > rebalance is pending. I'd rather not add this to
> > the
> > > > KIP
> > > > > > as we
> > > > > > > > > may
> > > > > > > > > > > want
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > change it at some point and it doesn't seem vital
> > to
> > > > > > establish
> > > > > > > > > it as
> > > > > > > > > > > > part
> > > > > > > > > > > > > > of the public contract for the new endpoint right
> > > now.
> > > > > > Also,
> > > > > > > > > small
> > > > > > > > > > > > > > point--yes, a 409 is useful to avoid forwarding
> > > > requests
> > > > > > to an
> > > > > > > > > > > > incorrect
> > > > > > > > > > > > > > leader, but it's also useful to ensure that there
> > > > aren't
> > > > > > any
> > > > > > > > > > > unresolved
> > > > > > > > > > > > > > writes to the config topic that might cause
> issues
> > > with
> > > > > the
> > > > > > > > > request
> > > > > > > > > > > > (such
> > > > > > > > > > > > > > as deleting the connector).
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 5. That's a good point--it may be misleading to
> > call
> > > a
> > > > > > > > connector
> > > > > > > > > > > > STOPPED
> > > > > > > > > > > > > > when it has zombie tasks lying around on the
> > > cluster. I
> > > > > > don't
> > > > > > > > > think
> > > > > > > > > > > > it'd
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > appropriate to do this synchronously while
> handling
> > > > > > requests to
> > > > > > > > > the
> > > > > > > > > > > PUT
> > > > > > > > > > > > > > /connectors/{connector}/stop since we'd want to
> > give
> > > > all
> > > > > > > > > > > > > currently-running
> > > > > > > > > > > > > > tasks a chance to gracefully shut down, though.
> I'm
> > > > also
> > > > > > not
> > > > > > > > sure
> > > > > > > > > > > that
> > > > > > > > > > > > > this
> > > > > > > > > > > > > > is a significant problem, either. If the
> connector
> > is
> > > > > > resumed,
> > > > > > > > > then
> > > > > > > > > > > all
> > > > > > > > > > > > > > zombie tasks will be automatically fenced out by
> > > their
> > > > > > > > > successors on
> > > > > > > > > > > > > > startup; if it's deleted, then we'll have wasted
> > > effort
> > > > > by
> > > > > > > > > performing
> > > > > > > > > > > > an
> > > > > > > > > > > > > > unnecessary round of fencing. It may be nice to
> > > > guarantee
> > > > > > that
> > > > > > > > > source
> > > > > > > > > > > > > task
> > > > > > > > > > > > > > resources will be deallocated after the connector
> > > > > > transitions
> > > > > > > > to
> > > > > > > > > > > > STOPPED,
> > > > > > > > > > > > > > but realistically, it doesn't do much to just
> fence
> > > out
> > > > > > their
> > > > > > > > > > > > producers,
> > > > > > > > > > > > > > since tasks can be blocked on a number of other
> > > > > operations
> > > > > > such
> > > > > > > > > as
> > > > > > > > > > > > > > key/value/header conversion, transformation, and
> > task
> > > > > > polling.
> > > > > > > > > It may
> > > > > > > > > > > > be
> > > > > > > > > > > > > a
> > > > > > > > > > > > > > little strange if data is produced to Kafka after
> > the
> > > > > > connector
> > > > > > > > > has
> > > > > > > > > > > > > > transitioned to STOPPED, but we can't provide the
> > > same
> > > > > > > > > guarantees for
> > > > > > > > > > > > > sink
> > > > > > > > > > > > > > connectors, since their tasks may be stuck on a
> > > > > > long-running
> > > > > > > > > > > > > SinkTask::put
> > > > > > > > > > > > > > that emits data even after the Connect framework
> > has
> > > > > > abandoned
> > > > > > > > > them
> > > > > > > > > > > > after
> > > > > > > > > > > > > > exhausting their graceful shutdown timeout.
> > > Ultimately,
> > > > > I'd
> > > > > > > > > prefer to
> > > > > > > > > > > > err
> > > > > > > > > > > > > > on the side of consistency and ease of
> > implementation
> > > > for
> > > > > > now,
> > > > > > > > > but I
> > > > > > > > > > > > may
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > missing a case where a few extra records from a
> > task
> > > > > that's
> > > > > > > > slow
> > > > > > > > > to
> > > > > > > > > > > > shut
> > > > > > > > > > > > > > down may cause serious issues--let me know.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 6. I'm hesitant to propose deprecation of the
> > PAUSED
> > > > > state
> > > > > > > > right
> > > > > > > > > now
> > > > > > > > > > > as
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > does serve a few purposes. Leaving tasks
> > > > idling-but-ready
> > > > > > makes
> > > > > > > > > > > > resuming
> > > > > > > > > > > > > > them less disruptive across the cluster, since a
> > > > > rebalance
> > > > > > > > isn't
> > > > > > > > > > > > > necessary.
> > > > > > > > > > > > > > It also reduces latency to resume the connector,
> > > > > > especially for
> > > > > > > > > ones
> > > > > > > > > > > > that
> > > > > > > > > > > > > > have to do a lot of state gathering on
> > initialization
> > > > to,
> > > > > > e.g.,
> > > > > > > > > read
> > > > > > > > > > > > > > offsets from an external system.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 7. There should be no risk of mixed tasks after a
> > > > > > downgrade,
> > > > > > > > > thanks
> > > > > > > > > > > to
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > empty set of task configs that gets published to
> > the
> > > > > config
> > > > > > > > > topic.
> > > > > > > > > > > Both
> > > > > > > > > > > > > > upgraded and downgraded workers will render an
> > empty
> > > > set
> > > > > of
> > > > > > > > > tasks for
> > > > > > > > > > > > the
> > > > > > > > > > > > > > connector, and keep that set of empty tasks until
> > the
> > > > > > connector
> > > > > > > > > is
> > > > > > > > > > > > > resumed.
> > > > > > > > > > > > > > Does that address your concerns?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > You're also correct that the linked Jira ticket
> was
> > > > > wrong;
> > > > > > > > > thanks for
> > > > > > > > > > > > > > pointing that out! Yes, KAFKA-4107 is the
> intended
> > > > > ticket,
> > > > > > and
> > > > > > > > > I've
> > > > > > > > > > > > > updated
> > > > > > > > > > > > > > the link in the KIP accordingly.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > [1] -
> > > https://issues.apache.org/jira/browse/KAFKA-4107
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Sun, Oct 16, 2022 at 10:42 AM Yash Mayya <
> > > > > > > > > yash.ma...@gmail.com>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Chris,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks a lot for this KIP, I think something
> like
> > > > this
> > > > > > has
> > > > > > > > been
> > > > > > > > > > > long
> > > > > > > > > > > > > > > overdue for Kafka Connect :)
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Some thoughts and questions that I had -
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 1. I'm wondering if you could elaborate a
> little
> > > more
> > > > > on
> > > > > > the
> > > > > > > > > use
> > > > > > > > > > > case
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > the `DELETE /connectors/{connector}/offsets`
> > API. I
> > > > > > think we
> > > > > > > > > can
> > > > > > > > > > > all
> > > > > > > > > > > > > > agree
> > > > > > > > > > > > > > > that a fine grained reset API that allows
> setting
> > > > > > arbitrary
> > > > > > > > > offsets
> > > > > > > > > > > > for
> > > > > > > > > > > > > > > partitions would be quite useful (which you
> talk
> > > > about
> > > > > > in the
> > > > > > > > > > > Future
> > > > > > > > > > > > > work
> > > > > > > > > > > > > > > section). But for the `DELETE
> > > > > > > > /connectors/{connector}/offsets`
> > > > > > > > > API
> > > > > > > > > > > in
> > > > > > > > > > > > > its
> > > > > > > > > > > > > > > described form, it looks like it would only
> > serve a
> > > > > > seemingly
> > > > > > > > > niche
> > > > > > > > > > > > use
> > > > > > > > > > > > > > > case where users want to avoid renaming
> > connectors
> > > -
> > > > > > because
> > > > > > > > > this
> > > > > > > > > > > new
> > > > > > > > > > > > > way
> > > > > > > > > > > > > > > of resetting offsets actually has more steps
> > (i.e.
> > > > stop
> > > > > > the
> > > > > > > > > > > > connector,
> > > > > > > > > > > > > > > reset offsets via the API, resume the
> connector)
> > > than
> > > > > > simply
> > > > > > > > > > > deleting
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > > re-creating the connector with a different
> name?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 2. The KIP talks about taking care that the
> > > response
> > > > > > formats
> > > > > > > > > > > > > (presumably
> > > > > > > > > > > > > > > only talking about the new GET API here) are
> > > > > symmetrical
> > > > > > for
> > > > > > > > > both
> > > > > > > > > > > > > source
> > > > > > > > > > > > > > > and sink connectors - is the end goal to have
> > users
> > > > of
> > > > > > Kafka
> > > > > > > > > > > Connect
> > > > > > > > > > > > > not
> > > > > > > > > > > > > > > even be aware that sink connectors use Kafka
> > > > consumers
> > > > > > under
> > > > > > > > > the
> > > > > > > > > > > hood
> > > > > > > > > > > > > > (i.e.
> > > > > > > > > > > > > > > have that as purely an implementation detail
> > > > abstracted
> > > > > > away
> > > > > > > > > from
> > > > > > > > > > > > > users)?
> > > > > > > > > > > > > > > While I understand the value of uniformity
> here,
> > > the
> > > > > > response
> > > > > > > > > > > format
> > > > > > > > > > > > > for
> > > > > > > > > > > > > > > sink connectors currently looks a little odd
> with
> > > the
> > > > > > > > > "partition"
> > > > > > > > > > > > field
> > > > > > > > > > > > > > > having "topic" and "partition" as sub-fields,
> > > > > especially
> > > > > > to
> > > > > > > > > users
> > > > > > > > > > > > > > familiar
> > > > > > > > > > > > > > > with Kafka semantics. Thoughts?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 3. Another little nitpick on the response
> format
> > -
> > > > why
> > > > > > do we
> > > > > > > > > need
> > > > > > > > > > > > > > "source"
> > > > > > > > > > > > > > > / "sink" as a field under "offsets"? Users can
> > > query
> > > > > the
> > > > > > > > > connector
> > > > > > > > > > > > type
> > > > > > > > > > > > > > via
> > > > > > > > > > > > > > > the existing `GET /connectors` API. If it's
> > deemed
> > > > > > important
> > > > > > > > > to let
> > > > > > > > > > > > > users
> > > > > > > > > > > > > > > know that the offsets they're seeing correspond
> > to
> > > a
> > > > > > source /
> > > > > > > > > sink
> > > > > > > > > > > > > > > connector, maybe we could have a top level
> field
> > > > "type"
> > > > > > in
> > > > > > > > the
> > > > > > > > > > > > response
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > the `GET /connectors/{connector}/offsets` API
> > > similar
> > > > > to
> > > > > > the
> > > > > > > > > `GET
> > > > > > > > > > > > > > > /connectors` API?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 4. For the `DELETE
> > /connectors/{connector}/offsets`
> > > > > API,
> > > > > > the
> > > > > > > > > KIP
> > > > > > > > > > > > > mentions
> > > > > > > > > > > > > > > that requests will be rejected if a rebalance
> is
> > > > > pending
> > > > > > -
> > > > > > > > > > > presumably
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > is to avoid forwarding requests to a leader
> which
> > > may
> > > > > no
> > > > > > > > > longer be
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > leader after the pending rebalance? In this
> case,
> > > the
> > > > > API
> > > > > > > > will
> > > > > > > > > > > > return a
> > > > > > > > > > > > > > > `409 Conflict` response similar to some of the
> > > > existing
> > > > > > APIs,
> > > > > > > > > > > right?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 5. Regarding fencing out previously running
> tasks
> > > > for a
> > > > > > > > > connector,
> > > > > > > > > > > do
> > > > > > > > > > > > > you
> > > > > > > > > > > > > > > think it would make more sense semantically to
> > have
> > > > > this
> > > > > > > > > > > implemented
> > > > > > > > > > > > in
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > stop endpoint where an empty set of tasks is
> > > > generated,
> > > > > > > > rather
> > > > > > > > > than
> > > > > > > > > > > > the
> > > > > > > > > > > > > > > delete offsets endpoint? This would also give
> the
> > > new
> > > > > > > > `STOPPED`
> > > > > > > > > > > > state a
> > > > > > > > > > > > > > > higher confidence of sorts, with any zombie
> tasks
> > > > being
> > > > > > > > fenced
> > > > > > > > > off
> > > > > > > > > > > > from
> > > > > > > > > > > > > > > continuing to produce data.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 6. Thanks for outlining the issues with the
> > current
> > > > > > state of
> > > > > > > > > the
> > > > > > > > > > > > > `PAUSED`
> > > > > > > > > > > > > > > state - I think a lot of users expect it to
> > behave
> > > > like
> > > > > > the
> > > > > > > > > > > `STOPPED`
> > > > > > > > > > > > > > state
> > > > > > > > > > > > > > > you outline in the KIP and are (unpleasantly)
> > > > surprised
> > > > > > when
> > > > > > > > it
> > > > > > > > > > > > > doesn't.
> > > > > > > > > > > > > > > However, this does beg the question of what the
> > > > > > usefulness of
> > > > > > > > > > > having
> > > > > > > > > > > > > two
> > > > > > > > > > > > > > > separate `PAUSED` and `STOPPED` states is? Do
> we
> > > want
> > > > > to
> > > > > > > > > continue
> > > > > > > > > > > > > > > supporting both these states in the future, or
> do
> > > you
> > > > > > see the
> > > > > > > > > > > > `STOPPED`
> > > > > > > > > > > > > > > state eventually causing the existing `PAUSED`
> > > state
> > > > to
> > > > > > be
> > > > > > > > > > > > deprecated?
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 7. I think the idea outlined in the KIP for
> > > handling
> > > > a
> > > > > > new
> > > > > > > > > state
> > > > > > > > > > > > during
> > > > > > > > > > > > > > > cluster downgrades / rolling upgrades is quite
> > > > clever,
> > > > > > but do
> > > > > > > > > you
> > > > > > > > > > > > think
> > > > > > > > > > > > > > > there could be any issues with having a mix of
> > > > "paused"
> > > > > > and
> > > > > > > > > > > "stopped"
> > > > > > > > > > > > > > tasks
> > > > > > > > > > > > > > > for the same connector across workers in a
> > cluster?
> > > > At
> > > > > > the
> > > > > > > > very
> > > > > > > > > > > > least,
> > > > > > > > > > > > > I
> > > > > > > > > > > > > > > think it would be fairly confusing to most
> users.
> > > I'm
> > > > > > > > > wondering if
> > > > > > > > > > > > this
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > be avoided by stating clearly in the KIP that
> the
> > > new
> > > > > > `PUT
> > > > > > > > > > > > > > > /connectors/{connector}/stop`
> > > > > > > > > > > > > > > can only be used on a cluster that is fully
> > > upgraded
> > > > to
> > > > > > an AK
> > > > > > > > > > > version
> > > > > > > > > > > > > > newer
> > > > > > > > > > > > > > > than the one which ends up containing changes
> > from
> > > > this
> > > > > > KIP
> > > > > > > > and
> > > > > > > > > > > that
> > > > > > > > > > > > > if a
> > > > > > > > > > > > > > > cluster needs to be downgraded to an older
> > version,
> > > > the
> > > > > > user
> > > > > > > > > should
> > > > > > > > > > > > > > ensure
> > > > > > > > > > > > > > > that none of the connectors on the cluster are
> > in a
> > > > > > stopped
> > > > > > > > > state?
> > > > > > > > > > > > With
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > existing implementation, it looks like an
> > > > > unknown/invalid
> > > > > > > > > target
> > > > > > > > > > > > state
> > > > > > > > > > > > > > > record is basically just discarded (with an
> error
> > > > > message
> > > > > > > > > logged),
> > > > > > > > > > > so
> > > > > > > > > > > > > it
> > > > > > > > > > > > > > > doesn't seem to be a disastrous failure
> scenario
> > > that
> > > > > can
> > > > > > > > bring
> > > > > > > > > > > down
> > > > > > > > > > > > a
> > > > > > > > > > > > > > > worker.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Yash
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton
> > > > > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Ashwin,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks for your thoughts. Regarding your
> > > questions:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 1. The response would show the offsets that
> are
> > > > > > visible to
> > > > > > > > > the
> > > > > > > > > > > > source
> > > > > > > > > > > > > > > > connector, so it would combine the contents
> of
> > > the
> > > > > two
> > > > > > > > > topics,
> > > > > > > > > > > > giving
> > > > > > > > > > > > > > > > priority to offsets present in the
> > > > connector-specific
> > > > > > > > topic.
> > > > > > > > > I'm
> > > > > > > > > > > > > > > imagining
> > > > > > > > > > > > > > > > a follow-up question that some people may
> have
> > in
> > > > > > response
> > > > > > > > to
> > > > > > > > > > > that
> > > > > > > > > > > > is
> > > > > > > > > > > > > > > > whether we'd want to provide insight into the
> > > > > contents
> > > > > > of a
> > > > > > > > > > > single
> > > > > > > > > > > > > > topic
> > > > > > > > > > > > > > > at
> > > > > > > > > > > > > > > > a time. It may be useful to be able to see
> this
> > > > > > information
> > > > > > > > > in
> > > > > > > > > > > > order
> > > > > > > > > > > > > to
> > > > > > > > > > > > > > > > debug connector issues or verify that it's
> safe
> > > to
> > > > > stop
> > > > > > > > > using a
> > > > > > > > > > > > > > > > connector-specific offsets topic (either
> > > > explicitly,
> > > > > or
> > > > > > > > > > > implicitly
> > > > > > > > > > > > > via
> > > > > > > > > > > > > > > > cluster downgrade). What do you think about
> > > adding
> > > > a
> > > > > > URL
> > > > > > > > > query
> > > > > > > > > > > > > > parameter
> > > > > > > > > > > > > > > > that allows users to dictate which view of
> the
> > > > > > connector's
> > > > > > > > > > > offsets
> > > > > > > > > > > > > they
> > > > > > > > > > > > > > > are
> > > > > > > > > > > > > > > > given in the REST response, with options for
> > the
> > > > > > worker's
> > > > > > > > > global
> > > > > > > > > > > > > topic,
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > connector-specific topic, and the combined
> view
> > > of
> > > > > them
> > > > > > > > that
> > > > > > > > > the
> > > > > > > > > > > > > > > connector
> > > > > > > > > > > > > > > > and its tasks see (which would be the
> default)?
> > > > This
> > > > > > may be
> > > > > > > > > too
> > > > > > > > > > > > much
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > V1
> > > > > > > > > > > > > > > > but it feels like it's at least worth
> > exploring a
> > > > > bit.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 2. There is no option for this at the moment.
> > > Reset
> > > > > > > > > semantics are
> > > > > > > > > > > > > > > extremely
> > > > > > > > > > > > > > > > coarse-grained; for source connectors, we
> > delete
> > > > all
> > > > > > source
> > > > > > > > > > > > offsets,
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > for sink connectors, we delete the entire
> > > consumer
> > > > > > group.
> > > > > > > > I'm
> > > > > > > > > > > > hoping
> > > > > > > > > > > > > > this
> > > > > > > > > > > > > > > > will be enough for V1 and that, if there's
> > > > sufficient
> > > > > > > > demand
> > > > > > > > > for
> > > > > > > > > > > > it,
> > > > > > > > > > > > > we
> > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > introduce a richer API for resetting or even
> > > > > modifying
> > > > > > > > > connector
> > > > > > > > > > > > > > offsets
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > a follow-up KIP.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 3. Good eye :) I think it's fine to keep the
> > > > existing
> > > > > > > > > behavior
> > > > > > > > > > > for
> > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > PAUSED state with the Connector instance,
> since
> > > the
> > > > > > primary
> > > > > > > > > > > purpose
> > > > > > > > > > > > > of
> > > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > Connector is to generate task configs and
> > monitor
> > > > the
> > > > > > > > > external
> > > > > > > > > > > > system
> > > > > > > > > > > > > > for
> > > > > > > > > > > > > > > > changes. If there's no chance for tasks to be
> > > > running
> > > > > > > > > anyways, I
> > > > > > > > > > > > > don't
> > > > > > > > > > > > > > > see
> > > > > > > > > > > > > > > > much value in allowing paused connectors to
> > > > generate
> > > > > > new
> > > > > > > > task
> > > > > > > > > > > > > configs,
> > > > > > > > > > > > > > > > especially since each time that happens a
> > > rebalance
> > > > > is
> > > > > > > > > triggered
> > > > > > > > > > > > and
> > > > > > > > > > > > > > > > there's a non-zero cost to that. What do you
> > > think?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 12:59 AM Ashwin
> > > > > > > > > > > > <apan...@confluent.io.invalid
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks for KIP Chris - I think this is a
> > useful
> > > > > > feature.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Can you please elaborate on the following
> in
> > > the
> > > > > KIP
> > > > > > -
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 1. How would the response of GET
> > > > > > > > > > > /connectors/{connector}/offsets
> > > > > > > > > > > > > look
> > > > > > > > > > > > > > > > like
> > > > > > > > > > > > > > > > > if the worker has both global and connector
> > > > > specific
> > > > > > > > > offsets
> > > > > > > > > > > > topic
> > > > > > > > > > > > > ?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 2. How can we pass the reset options like
> > > > shift-by
> > > > > ,
> > > > > > > > > > > to-date-time
> > > > > > > > > > > > > > etc.
> > > > > > > > > > > > > > > > > using a REST API like DELETE
> > > > > > > > > /connectors/{connector}/offsets ?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 3. Today PAUSE operation on a connector
> > invokes
> > > > its
> > > > > > stop
> > > > > > > > > > > method -
> > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > there be a change here to reduce confusion
> > with
> > > > the
> > > > > > new
> > > > > > > > > > > proposed
> > > > > > > > > > > > > > > STOPPED
> > > > > > > > > > > > > > > > > state ?
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > Ashwin
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Fri, Oct 14, 2022 at 2:22 AM Chris
> Egerton
> > > > > > > > > > > > > > <chr...@aiven.io.invalid
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > I noticed a fairly large gap in the first
> > > > version
> > > > > > of
> > > > > > > > > this KIP
> > > > > > > > > > > > > that
> > > > > > > > > > > > > > I
> > > > > > > > > > > > > > > > > > published last Friday, which has to do
> with
> > > > > > > > accommodating
> > > > > > > > > > > > > > connectors
> > > > > > > > > > > > > > > > > > that target different Kafka clusters than
> > the
> > > > one
> > > > > > that
> > > > > > > > > the
> > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > > > Connect
> > > > > > > > > > > > > > > > > > cluster uses for its internal topics and
> > > source
> > > > > > > > > connectors
> > > > > > > > > > > with
> > > > > > > > > > > > > > > > dedicated
> > > > > > > > > > > > > > > > > > offsets topics. I've since updated the
> KIP
> > to
> > > > > > address
> > > > > > > > > this
> > > > > > > > > > > gap,
> > > > > > > > > > > > > > which
> > > > > > > > > > > > > > > > has
> > > > > > > > > > > > > > > > > > substantially altered the design. Wanted
> to
> > > > give
> > > > > a
> > > > > > > > > heads-up
> > > > > > > > > > > to
> > > > > > > > > > > > > > anyone
> > > > > > > > > > > > > > > > > > that's already started reviewing.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Fri, Oct 7, 2022 at 1:29 PM Chris
> > Egerton
> > > <
> > > > > > > > > > > chr...@aiven.io>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi all,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I'd like to begin discussion on a KIP
> to
> > > add
> > > > > > offsets
> > > > > > > > > > > support
> > > > > > > > > > > > to
> > > > > > > > > > > > > > the
> > > > > > > > > > > > > > > > > Kafka
> > > > > > > > > > > > > > > > > > > Connect REST API:
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Cheers,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Chris
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to