Hi Chris,

Thanks a lot for this KIP, I think something like this has been long
overdue for Kafka Connect :)

Some thoughts and questions that I had -

1. I'm wondering if you could elaborate a little more on the use case for
the `DELETE /connectors/{connector}/offsets` API. I think we can all agree
that a fine grained reset API that allows setting arbitrary offsets for
partitions would be quite useful (which you talk about in the Future work
section). But for the `DELETE /connectors/{connector}/offsets` API in its
described form, it looks like it would only serve a seemingly niche use
case where users want to avoid renaming connectors - because this new way
of resetting offsets actually has more steps (i.e. stop the connector,
reset offsets via the API, resume the connector) than simply deleting and
re-creating the connector with a different name?

2. The KIP talks about taking care that the response formats (presumably
only talking about the new GET API here) are symmetrical for both source
and sink connectors - is the end goal to have users of Kafka Connect not
even be aware that sink connectors use Kafka consumers under the hood (i.e.
have that as purely an implementation detail abstracted away from users)?
While I understand the value of uniformity here, the response format for
sink connectors currently looks a little odd with the "partition" field
having "topic" and "partition" as sub-fields, especially to users familiar
with Kafka semantics. Thoughts?

3. Another little nitpick on the response format - why do we need "source"
/ "sink" as a field under "offsets"? Users can query the connector type via
the existing `GET /connectors` API. If it's deemed important to let users
know that the offsets they're seeing correspond to a source / sink
connector, maybe we could have a top level field "type" in the response for
the `GET /connectors/{connector}/offsets` API similar to the `GET
/connectors` API?

4. For the `DELETE /connectors/{connector}/offsets` API, the KIP mentions
that requests will be rejected if a rebalance is pending - presumably this
is to avoid forwarding requests to a leader which may no longer be the
leader after the pending rebalance? In this case, the API will return a
`409 Conflict` response similar to some of the existing APIs, right?

5. Regarding fencing out previously running tasks for a connector, do you
think it would make more sense semantically to have this implemented in the
stop endpoint where an empty set of tasks is generated, rather than the
delete offsets endpoint? This would also give the new `STOPPED` state a
higher confidence of sorts, with any zombie tasks being fenced off from
continuing to produce data.

6. Thanks for outlining the issues with the current state of the `PAUSED`
state - I think a lot of users expect it to behave like the `STOPPED` state
you outline in the KIP and are (unpleasantly) surprised when it doesn't.
However, this does beg the question of what the usefulness of having two
separate `PAUSED` and `STOPPED` states is? Do we want to continue
supporting both these states in the future, or do you see the `STOPPED`
state eventually causing the existing `PAUSED` state to be deprecated?

7. I think the idea outlined in the KIP for handling a new state during
cluster downgrades / rolling upgrades is quite clever, but do you think
there could be any issues with having a mix of "paused" and "stopped" tasks
for the same connector across workers in a cluster? At the very least, I
think it would be fairly confusing to most users. I'm wondering if this can
be avoided by stating clearly in the KIP that the new `PUT
/connectors/{connector}/stop`
can only be used on a cluster that is fully upgraded to an AK version newer
than the one which ends up containing changes from this KIP and that if a
cluster needs to be downgraded to an older version, the user should ensure
that none of the connectors on the cluster are in a stopped state? With the
existing implementation, it looks like an unknown/invalid target state
record is basically just discarded (with an error message logged), so it
doesn't seem to be a disastrous failure scenario that can bring down a
worker.


Thanks,
Yash

On Fri, Oct 14, 2022 at 8:35 PM Chris Egerton <chr...@aiven.io.invalid>
wrote:

> Hi Ashwin,
>
> Thanks for your thoughts. Regarding your questions:
>
> 1. The response would show the offsets that are visible to the source
> connector, so it would combine the contents of the two topics, giving
> priority to offsets present in the connector-specific topic. I'm imagining
> a follow-up question that some people may have in response to that is
> whether we'd want to provide insight into the contents of a single topic at
> a time. It may be useful to be able to see this information in order to
> debug connector issues or verify that it's safe to stop using a
> connector-specific offsets topic (either explicitly, or implicitly via
> cluster downgrade). What do you think about adding a URL query parameter
> that allows users to dictate which view of the connector's offsets they are
> given in the REST response, with options for the worker's global topic, the
> connector-specific topic, and the combined view of them that the connector
> and its tasks see (which would be the default)? This may be too much for V1
> but it feels like it's at least worth exploring a bit.
>
> 2. There is no option for this at the moment. Reset semantics are extremely
> coarse-grained; for source connectors, we delete all source offsets, and
> for sink connectors, we delete the entire consumer group. I'm hoping this
> will be enough for V1 and that, if there's sufficient demand for it, we can
> introduce a richer API for resetting or even modifying connector offsets in
> a follow-up KIP.
>
> 3. Good eye :) I think it's fine to keep the existing behavior for the
> PAUSED state with the Connector instance, since the primary purpose of the
> Connector is to generate task configs and monitor the external system for
> changes. If there's no chance for tasks to be running anyways, I don't see
> much value in allowing paused connectors to generate new task configs,
> especially since each time that happens a rebalance is triggered and
> there's a non-zero cost to that. What do you think?
>
> Cheers,
>
> Chris
>
> On Fri, Oct 14, 2022 at 12:59 AM Ashwin <apan...@confluent.io.invalid>
> wrote:
>
> > Thanks for KIP Chris - I think this is a useful feature.
> >
> > Can you please elaborate on the following in the KIP -
> >
> > 1. How would the response of GET /connectors/{connector}/offsets look
> like
> > if the worker has both global and connector specific offsets topic ?
> >
> > 2. How can we pass the reset options like shift-by , to-date-time etc.
> > using a REST API like DELETE /connectors/{connector}/offsets ?
> >
> > 3. Today PAUSE operation on a connector invokes its stop method - will
> > there be a change here to reduce confusion with the new proposed STOPPED
> > state ?
> >
> > Thanks,
> > Ashwin
> >
> > On Fri, Oct 14, 2022 at 2:22 AM Chris Egerton <chr...@aiven.io.invalid>
> > wrote:
> >
> > > Hi all,
> > >
> > > I noticed a fairly large gap in the first version of this KIP that I
> > > published last Friday, which has to do with accommodating connectors
> > > that target different Kafka clusters than the one that the Kafka
> Connect
> > > cluster uses for its internal topics and source connectors with
> dedicated
> > > offsets topics. I've since updated the KIP to address this gap, which
> has
> > > substantially altered the design. Wanted to give a heads-up to anyone
> > > that's already started reviewing.
> > >
> > > Cheers,
> > >
> > > Chris
> > >
> > > On Fri, Oct 7, 2022 at 1:29 PM Chris Egerton <chr...@aiven.io> wrote:
> > >
> > > > Hi all,
> > > >
> > > > I'd like to begin discussion on a KIP to add offsets support to the
> > Kafka
> > > > Connect REST API:
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect
> > > >
> > > > Cheers,
> > > >
> > > > Chris
> > > >
> > >
> >
>

Reply via email to