Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-21 Thread Konstantine Karantasis
Randall, I updated the KIP to add your suggestion above and better explain
the few points that were outstanding since last week.

Almog, I agree that 'timestamp' was too generic. I'm ok with
'discoverTimestamp'. I updated the KIP with your naming suggestion.

Best,
Konstantine


On Tue, Jan 21, 2020 at 10:43 AM Almog Gavra  wrote:

> Thanks again Konstantine - really excited about this KIP!
>
> I'm about ready to +1 (non-binding) it with just one comment left: What do
> you think about changing the timestamp field to "discoverTimestamp" or
> something like that to indicate that it is the timestamp of the _first_
> time we recorded/discovered this topic. This is important if we later want
> to add a "mostRecentTimestamp" field in the future to distinguish between
> the two.
>
> Almog
>
> On Tue, Jan 21, 2020 at 8:01 AM Randall Hauch  wrote:
>
> > Thanks, Konstantine.
> >
> > One minor request to clarify the following sentence:
> >
> >
> > As soon as a worker detects the addition of a topic to a connector's set
> of
> > active topics, the worker will cease to post update messages to the
> > status.storage.topic for that connector.
> >
> >
> > As it stands, it sounds like the worker will not write *any more active
> > topic records for this or any connectors* to the topic specified by the
> > `status.storage.topic` worker configuration once the worker detects (by
> > reading) a new active topic. I suspect that this is not the intention,
> and
> > that instead it is trying to say that no more messages *for this topic
> and
> > connector*. IOW, something more like:
> >
> >
> > As soon as a worker detects the addition of a topic to a connector's set
> of
> > active topics, the worker will not post to the status.storage.topic
> > additional update records for the connector and this newly-detected
> active
> > topic.
> >
> >
> > Otherwise, this KIP looks great!
> >
> > Best regards,
> >
> > Randall
> >
> > On Fri, Jan 17, 2020 at 8:04 PM Konstantine Karantasis <
> > konstant...@confluent.io> wrote:
> >
> > > Hi all,
> > >
> > > I've updated KIP-558 with the following based on our previous
> discussion:
> > >
> > > * Added timestamp to the metadata (the record value).
> > > * The KIP now mentions a metric-based implementation in the Rejected
> > > Alternatives section.
> > > * The record key format is now using the single character ':' as a
> > > separator between topic-${topic name} and connector-${connector name}
> > > * Added a bullet point to mention that the topic storing the new
> > > information can be a partitioned topic.
> > > * The KIP mentions that the feature does not require rebuilding
> > connectors
> > > (no changes in public interfaces/classes).
> > > * Added a security section.
> > > * KIP preserves symmetry with respect to reset between both types of
> > > connectors and keeps reset and config as separate, unrelated endpoints.
> > >
> > > Given than we made significant progress these past few days and only a
> > few
> > > minor improvements in the KIPs text are remaining, I'd like to start
> the
> > > vote today, so that we give this KIP the necessary time (72 hours) to
> > have
> > > a chance to be voted by the KIP deadline next Wednesday, Jan 22nd.
> > > Let's return here, or the main vote thread for any comments (either
> minor
> > > to major).
> > >
> > > Best,
> > > Konstantine
> > >
> > >
> > >
> > > On Fri, Jan 17, 2020 at 3:51 PM Konstantine Karantasis <
> > > konstant...@confluent.io> wrote:
> > >
> > > >
> > > > Thanks for the follow up Chris. Replies below:
> > > >
> > > > On Fri, Jan 17, 2020 at 3:07 PM Christopher Egerton <
> > chr...@confluent.io
> > > >
> > > > wrote:
> > > >
> > > >> Thanks, Konstantine. Just a few more questions:
> > > >>
> > > >> > > 2. What is the motivation for the `topic.tracking.allow.reset`
> > > config?
> > > >> Is
> > > >> > > there any anticipated case where it would be useful to have
> topic
> > > >> tracking
> > > >> > > enabled but with resets disabled? We could easily add this
> > > >> configuration
> > > >> > > later if a use case arises, but if we add it now it'll be
> > difficult
> > > to
> > > >> > > remove.
> > > >> > >
> > > >>
> > > >> > The motivation is for operators of a Connect cluster to be able to
> > > >> disable
> > > >> > resetting the history of active topics altogether, while allowing
> at
> > > the
> > > >> > same time to view the active sets.
> > > >>
> > > >> What I was trying to ask was, is there a use case for enabling the
> > > latter
> > > >> but not the former? We should be careful about adding extra worker
> > > configs
> > > >> and unless we can anticipate a reasonable scenario in which this
> would
> > > >> happen, we should err on the side of caution and avoid adding a
> config
> > > >> that
> > > >> would be difficult to remove later but, comparably, much easier to
> > add.
> > > >>
> > > >
> > > > The application use case is the ability to have immutable histories
> of
> > > > topic usage or control when resets are 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-21 Thread Almog Gavra
Thanks again Konstantine - really excited about this KIP!

I'm about ready to +1 (non-binding) it with just one comment left: What do
you think about changing the timestamp field to "discoverTimestamp" or
something like that to indicate that it is the timestamp of the _first_
time we recorded/discovered this topic. This is important if we later want
to add a "mostRecentTimestamp" field in the future to distinguish between
the two.

Almog

On Tue, Jan 21, 2020 at 8:01 AM Randall Hauch  wrote:

> Thanks, Konstantine.
>
> One minor request to clarify the following sentence:
>
>
> As soon as a worker detects the addition of a topic to a connector's set of
> active topics, the worker will cease to post update messages to the
> status.storage.topic for that connector.
>
>
> As it stands, it sounds like the worker will not write *any more active
> topic records for this or any connectors* to the topic specified by the
> `status.storage.topic` worker configuration once the worker detects (by
> reading) a new active topic. I suspect that this is not the intention, and
> that instead it is trying to say that no more messages *for this topic and
> connector*. IOW, something more like:
>
>
> As soon as a worker detects the addition of a topic to a connector's set of
> active topics, the worker will not post to the status.storage.topic
> additional update records for the connector and this newly-detected active
> topic.
>
>
> Otherwise, this KIP looks great!
>
> Best regards,
>
> Randall
>
> On Fri, Jan 17, 2020 at 8:04 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> > Hi all,
> >
> > I've updated KIP-558 with the following based on our previous discussion:
> >
> > * Added timestamp to the metadata (the record value).
> > * The KIP now mentions a metric-based implementation in the Rejected
> > Alternatives section.
> > * The record key format is now using the single character ':' as a
> > separator between topic-${topic name} and connector-${connector name}
> > * Added a bullet point to mention that the topic storing the new
> > information can be a partitioned topic.
> > * The KIP mentions that the feature does not require rebuilding
> connectors
> > (no changes in public interfaces/classes).
> > * Added a security section.
> > * KIP preserves symmetry with respect to reset between both types of
> > connectors and keeps reset and config as separate, unrelated endpoints.
> >
> > Given than we made significant progress these past few days and only a
> few
> > minor improvements in the KIPs text are remaining, I'd like to start the
> > vote today, so that we give this KIP the necessary time (72 hours) to
> have
> > a chance to be voted by the KIP deadline next Wednesday, Jan 22nd.
> > Let's return here, or the main vote thread for any comments (either minor
> > to major).
> >
> > Best,
> > Konstantine
> >
> >
> >
> > On Fri, Jan 17, 2020 at 3:51 PM Konstantine Karantasis <
> > konstant...@confluent.io> wrote:
> >
> > >
> > > Thanks for the follow up Chris. Replies below:
> > >
> > > On Fri, Jan 17, 2020 at 3:07 PM Christopher Egerton <
> chr...@confluent.io
> > >
> > > wrote:
> > >
> > >> Thanks, Konstantine. Just a few more questions:
> > >>
> > >> > > 2. What is the motivation for the `topic.tracking.allow.reset`
> > config?
> > >> Is
> > >> > > there any anticipated case where it would be useful to have topic
> > >> tracking
> > >> > > enabled but with resets disabled? We could easily add this
> > >> configuration
> > >> > > later if a use case arises, but if we add it now it'll be
> difficult
> > to
> > >> > > remove.
> > >> > >
> > >>
> > >> > The motivation is for operators of a Connect cluster to be able to
> > >> disable
> > >> > resetting the history of active topics altogether, while allowing at
> > the
> > >> > same time to view the active sets.
> > >>
> > >> What I was trying to ask was, is there a use case for enabling the
> > latter
> > >> but not the former? We should be careful about adding extra worker
> > configs
> > >> and unless we can anticipate a reasonable scenario in which this would
> > >> happen, we should err on the side of caution and avoid adding a config
> > >> that
> > >> would be difficult to remove later but, comparably, much easier to
> add.
> > >>
> > >
> > > The application use case is the ability to have immutable histories of
> > > topic usage or control when resets are allowed and how they are
> performed
> > > (e.g. resets could be allowed briefly during a maintenance phase and
> get
> > > disabled again).
> > > I'm also never thrilled when I add an extra configuration parameter.
> > > However namespacing here will help with the extra cognitive burden.
> > > Similarly the defaults should cover most use cases too.
> > >
> > > > > 5. As far as automatic resets for sink connectors go, I agree with
> > your
> > >> > > reasoning about the inherent asymmetry between sinks and sources,
> > and
> > >> with
> > >> > > the motivation to avoid confusing users by listing
> 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-21 Thread Randall Hauch
Thanks, Konstantine.

One minor request to clarify the following sentence:


As soon as a worker detects the addition of a topic to a connector's set of
active topics, the worker will cease to post update messages to the
status.storage.topic for that connector.


As it stands, it sounds like the worker will not write *any more active
topic records for this or any connectors* to the topic specified by the
`status.storage.topic` worker configuration once the worker detects (by
reading) a new active topic. I suspect that this is not the intention, and
that instead it is trying to say that no more messages *for this topic and
connector*. IOW, something more like:


As soon as a worker detects the addition of a topic to a connector's set of
active topics, the worker will not post to the status.storage.topic
additional update records for the connector and this newly-detected active
topic.


Otherwise, this KIP looks great!

Best regards,

Randall

On Fri, Jan 17, 2020 at 8:04 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi all,
>
> I've updated KIP-558 with the following based on our previous discussion:
>
> * Added timestamp to the metadata (the record value).
> * The KIP now mentions a metric-based implementation in the Rejected
> Alternatives section.
> * The record key format is now using the single character ':' as a
> separator between topic-${topic name} and connector-${connector name}
> * Added a bullet point to mention that the topic storing the new
> information can be a partitioned topic.
> * The KIP mentions that the feature does not require rebuilding connectors
> (no changes in public interfaces/classes).
> * Added a security section.
> * KIP preserves symmetry with respect to reset between both types of
> connectors and keeps reset and config as separate, unrelated endpoints.
>
> Given than we made significant progress these past few days and only a few
> minor improvements in the KIPs text are remaining, I'd like to start the
> vote today, so that we give this KIP the necessary time (72 hours) to have
> a chance to be voted by the KIP deadline next Wednesday, Jan 22nd.
> Let's return here, or the main vote thread for any comments (either minor
> to major).
>
> Best,
> Konstantine
>
>
>
> On Fri, Jan 17, 2020 at 3:51 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> >
> > Thanks for the follow up Chris. Replies below:
> >
> > On Fri, Jan 17, 2020 at 3:07 PM Christopher Egerton  >
> > wrote:
> >
> >> Thanks, Konstantine. Just a few more questions:
> >>
> >> > > 2. What is the motivation for the `topic.tracking.allow.reset`
> config?
> >> Is
> >> > > there any anticipated case where it would be useful to have topic
> >> tracking
> >> > > enabled but with resets disabled? We could easily add this
> >> configuration
> >> > > later if a use case arises, but if we add it now it'll be difficult
> to
> >> > > remove.
> >> > >
> >>
> >> > The motivation is for operators of a Connect cluster to be able to
> >> disable
> >> > resetting the history of active topics altogether, while allowing at
> the
> >> > same time to view the active sets.
> >>
> >> What I was trying to ask was, is there a use case for enabling the
> latter
> >> but not the former? We should be careful about adding extra worker
> configs
> >> and unless we can anticipate a reasonable scenario in which this would
> >> happen, we should err on the side of caution and avoid adding a config
> >> that
> >> would be difficult to remove later but, comparably, much easier to add.
> >>
> >
> > The application use case is the ability to have immutable histories of
> > topic usage or control when resets are allowed and how they are performed
> > (e.g. resets could be allowed briefly during a maintenance phase and get
> > disabled again).
> > I'm also never thrilled when I add an extra configuration parameter.
> > However namespacing here will help with the extra cognitive burden.
> > Similarly the defaults should cover most use cases too.
> >
> > > > 5. As far as automatic resets for sink connectors go, I agree with
> your
> >> > > reasoning about the inherent asymmetry between sinks and sources,
> and
> >> with
> >> > > the motivation to avoid confusing users by listing
> no-longer-consumed
> >> > > topics in the active topics for a sink connector. I think that this
> >> > > asymmetry is worth avoiding a scenario where a connector is
> >> reconfigured to
> >> > > only consume from topic "foo" but, from a prior configuration, topic
> >> "bar"
> >> > > is still listed in its active topics.
> >> > > I do want to request clarification on the meaning of the phrase "any
> >> topics
> >> > > no longer consumed" as used under the header "Restarting,
> >> reconfiguring
> >> or
> >> > > deleting a connector". Does this mean that the current set of active
> >> topics
> >> > > for the connector will be filtered and any that are longer contained
> >> in
> >> the
> >> > > sink connector's "topics" config or matched by its 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-17 Thread Konstantine Karantasis
Hi all,

I've updated KIP-558 with the following based on our previous discussion:

* Added timestamp to the metadata (the record value).
* The KIP now mentions a metric-based implementation in the Rejected
Alternatives section.
* The record key format is now using the single character ':' as a
separator between topic-${topic name} and connector-${connector name}
* Added a bullet point to mention that the topic storing the new
information can be a partitioned topic.
* The KIP mentions that the feature does not require rebuilding connectors
(no changes in public interfaces/classes).
* Added a security section.
* KIP preserves symmetry with respect to reset between both types of
connectors and keeps reset and config as separate, unrelated endpoints.

Given than we made significant progress these past few days and only a few
minor improvements in the KIPs text are remaining, I'd like to start the
vote today, so that we give this KIP the necessary time (72 hours) to have
a chance to be voted by the KIP deadline next Wednesday, Jan 22nd.
Let's return here, or the main vote thread for any comments (either minor
to major).

Best,
Konstantine



On Fri, Jan 17, 2020 at 3:51 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

>
> Thanks for the follow up Chris. Replies below:
>
> On Fri, Jan 17, 2020 at 3:07 PM Christopher Egerton 
> wrote:
>
>> Thanks, Konstantine. Just a few more questions:
>>
>> > > 2. What is the motivation for the `topic.tracking.allow.reset` config?
>> Is
>> > > there any anticipated case where it would be useful to have topic
>> tracking
>> > > enabled but with resets disabled? We could easily add this
>> configuration
>> > > later if a use case arises, but if we add it now it'll be difficult to
>> > > remove.
>> > >
>>
>> > The motivation is for operators of a Connect cluster to be able to
>> disable
>> > resetting the history of active topics altogether, while allowing at the
>> > same time to view the active sets.
>>
>> What I was trying to ask was, is there a use case for enabling the latter
>> but not the former? We should be careful about adding extra worker configs
>> and unless we can anticipate a reasonable scenario in which this would
>> happen, we should err on the side of caution and avoid adding a config
>> that
>> would be difficult to remove later but, comparably, much easier to add.
>>
>
> The application use case is the ability to have immutable histories of
> topic usage or control when resets are allowed and how they are performed
> (e.g. resets could be allowed briefly during a maintenance phase and get
> disabled again).
> I'm also never thrilled when I add an extra configuration parameter.
> However namespacing here will help with the extra cognitive burden.
> Similarly the defaults should cover most use cases too.
>
> > > 5. As far as automatic resets for sink connectors go, I agree with your
>> > > reasoning about the inherent asymmetry between sinks and sources, and
>> with
>> > > the motivation to avoid confusing users by listing no-longer-consumed
>> > > topics in the active topics for a sink connector. I think that this
>> > > asymmetry is worth avoiding a scenario where a connector is
>> reconfigured to
>> > > only consume from topic "foo" but, from a prior configuration, topic
>> "bar"
>> > > is still listed in its active topics.
>> > > I do want to request clarification on the meaning of the phrase "any
>> topics
>> > > no longer consumed" as used under the header "Restarting,
>> reconfiguring
>> or
>> > > deleting a connector". Does this mean that the current set of active
>> topics
>> > > for the connector will be filtered and any that are longer contained
>> in
>> the
>> > > sink connector's "topics" config or matched by its "topics.regex"
>> config
>> > > will be removed, or does it mean that all topics will be removed and
>> then
>> > > the active topics list will be repopulated as records are consumed
>> from
>> new
>> > > topics?
>> > >
>>
>> > The intention was to imply the former. But based on Randall's comment
>> > above, I'm changing the KIP to include a reset parameter in the PUT
>> > /connectors/{name}/config endpoint
>> > In this case, the reset will be a complete reset for both source and
>> sink
>> > connectors. This will help keeping the behavior symmetric between the
>> two
>> > connector types.
>>
>> I did see Randall's suggestion, but I was hoping we could retain some more
>> intelligent behavior. Two things I'd like for us to avoid if possible:
>>
>> - Sinks consuming from infrequently-written topics unnecessarily dropping
>> those topics, either as part of an explicit reset or an implicit one
>> - Sinks listing a topic in their active topics list that they are, in
>> reality, no longer consuming from
>>
>> Intelligently filtering out no-longer-consumed topics from a sink
>> connector's active topics list (instead of blanket resetting, or not
>> resetting at all) would prevent both of those from happening. We could
>> expand the 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-17 Thread Konstantine Karantasis
Thanks for the follow up Chris. Replies below:

On Fri, Jan 17, 2020 at 3:07 PM Christopher Egerton 
wrote:

> Thanks, Konstantine. Just a few more questions:
>
> > > 2. What is the motivation for the `topic.tracking.allow.reset` config?
> Is
> > > there any anticipated case where it would be useful to have topic
> tracking
> > > enabled but with resets disabled? We could easily add this
> configuration
> > > later if a use case arises, but if we add it now it'll be difficult to
> > > remove.
> > >
>
> > The motivation is for operators of a Connect cluster to be able to
> disable
> > resetting the history of active topics altogether, while allowing at the
> > same time to view the active sets.
>
> What I was trying to ask was, is there a use case for enabling the latter
> but not the former? We should be careful about adding extra worker configs
> and unless we can anticipate a reasonable scenario in which this would
> happen, we should err on the side of caution and avoid adding a config that
> would be difficult to remove later but, comparably, much easier to add.
>

The application use case is the ability to have immutable histories of
topic usage or control when resets are allowed and how they are performed
(e.g. resets could be allowed briefly during a maintenance phase and get
disabled again).
I'm also never thrilled when I add an extra configuration parameter.
However namespacing here will help with the extra cognitive burden.
Similarly the defaults should cover most use cases too.

> > 5. As far as automatic resets for sink connectors go, I agree with your
> > > reasoning about the inherent asymmetry between sinks and sources, and
> with
> > > the motivation to avoid confusing users by listing no-longer-consumed
> > > topics in the active topics for a sink connector. I think that this
> > > asymmetry is worth avoiding a scenario where a connector is
> reconfigured to
> > > only consume from topic "foo" but, from a prior configuration, topic
> "bar"
> > > is still listed in its active topics.
> > > I do want to request clarification on the meaning of the phrase "any
> topics
> > > no longer consumed" as used under the header "Restarting, reconfiguring
> or
> > > deleting a connector". Does this mean that the current set of active
> topics
> > > for the connector will be filtered and any that are longer contained in
> the
> > > sink connector's "topics" config or matched by its "topics.regex"
> config
> > > will be removed, or does it mean that all topics will be removed and
> then
> > > the active topics list will be repopulated as records are consumed from
> new
> > > topics?
> > >
>
> > The intention was to imply the former. But based on Randall's comment
> > above, I'm changing the KIP to include a reset parameter in the PUT
> > /connectors/{name}/config endpoint
> > In this case, the reset will be a complete reset for both source and sink
> > connectors. This will help keeping the behavior symmetric between the two
> > connector types.
>
> I did see Randall's suggestion, but I was hoping we could retain some more
> intelligent behavior. Two things I'd like for us to avoid if possible:
>
> - Sinks consuming from infrequently-written topics unnecessarily dropping
> those topics, either as part of an explicit reset or an implicit one
> - Sinks listing a topic in their active topics list that they are, in
> reality, no longer consuming from
>
> Intelligently filtering out no-longer-consumed topics from a sink
> connector's active topics list (instead of blanket resetting, or not
> resetting at all) would prevent both of those from happening. We could
> expand the proposed reset parameter from a boolean to a three-option
> parameter with "none" (don't reset the topics list), "all" (reset the
> entire topics list), and "infer" (reset all topics if for a source, or
> intelligently filter out no-longer-consumed topics if for a sink).
>

I don't see significant advantages in the complexity that the three-value
query parameter would introduce.
Overall resetting is included for convenience and is not essential to the
main objective of this KIP which is to track the topics used by a connector
during its lifetime.
I think it's desirable to strike a good balance between the objective of
tracking the topics used by connectors and keeping things simple.
Given that the general programming model that Kafka Connect supports is
that of continuous streams of events, representing active topics as either
the topics that a connector has used since it was first created or as the
topics that have been actively used since the latest reset is sufficient to
cover a large majority of use cases.

Furthermore, I see future KIPs that would add features to topic tracking
preferable in comparison to future KIPs that would try to remedy this first
KIP with adjustments, simplifications and deprecation of features.
Therefore, the query parameter, which I'll add to the KIP shortly, can
indeed be represented as a boolean. Absence 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-17 Thread Christopher Egerton
Thanks, Konstantine. Just a few more questions:

> > 2. What is the motivation for the `topic.tracking.allow.reset` config?
Is
> > there any anticipated case where it would be useful to have topic
tracking
> > enabled but with resets disabled? We could easily add this configuration
> > later if a use case arises, but if we add it now it'll be difficult to
> > remove.
> >

> The motivation is for operators of a Connect cluster to be able to disable
> resetting the history of active topics altogether, while allowing at the
> same time to view the active sets.

What I was trying to ask was, is there a use case for enabling the latter
but not the former? We should be careful about adding extra worker configs
and unless we can anticipate a reasonable scenario in which this would
happen, we should err on the side of caution and avoid adding a config that
would be difficult to remove later but, comparably, much easier to add.


> > 5. As far as automatic resets for sink connectors go, I agree with your
> > reasoning about the inherent asymmetry between sinks and sources, and
with
> > the motivation to avoid confusing users by listing no-longer-consumed
> > topics in the active topics for a sink connector. I think that this
> > asymmetry is worth avoiding a scenario where a connector is
reconfigured to
> > only consume from topic "foo" but, from a prior configuration, topic
"bar"
> > is still listed in its active topics.
> > I do want to request clarification on the meaning of the phrase "any
topics
> > no longer consumed" as used under the header "Restarting, reconfiguring
or
> > deleting a connector". Does this mean that the current set of active
topics
> > for the connector will be filtered and any that are longer contained in
the
> > sink connector's "topics" config or matched by its "topics.regex" config
> > will be removed, or does it mean that all topics will be removed and
then
> > the active topics list will be repopulated as records are consumed from
new
> > topics?
> >

> The intention was to imply the former. But based on Randall's comment
> above, I'm changing the KIP to include a reset parameter in the PUT
> /connectors/{name}/config endpoint
> In this case, the reset will be a complete reset for both source and sink
> connectors. This will help keeping the behavior symmetric between the two
> connector types.

I did see Randall's suggestion, but I was hoping we could retain some more
intelligent behavior. Two things I'd like for us to avoid if possible:

- Sinks consuming from infrequently-written topics unnecessarily dropping
those topics, either as part of an explicit reset or an implicit one
- Sinks listing a topic in their active topics list that they are, in
reality, no longer consuming from

Intelligently filtering out no-longer-consumed topics from a sink
connector's active topics list (instead of blanket resetting, or not
resetting at all) would prevent both of those from happening. We could
expand the proposed reset parameter from a boolean to a three-option
parameter with "none" (don't reset the topics list), "all" (reset the
entire topics list), and "infer" (reset all topics if for a source, or
intelligently filter out no-longer-consumed topics if for a sink).

Independent of your thoughts on the above, what will the default value for
the newly-proposed parameter be? If resets are performed by default, the
first scenario I outlined would become possible; if not, then the second
would become possible. I'd lean towards performing them by default but
would be interested in others' thoughts. (If the proposed "infer" value
were the default, neither scenario would be an issue).

Cheers,

Chris

On Fri, Jan 17, 2020 at 2:01 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hey Chris! Thanks for the comments. Answers inline below:
>
> On Fri, Jan 17, 2020 at 11:47 AM Christopher Egerton 
> wrote:
>
> > Hi Konstantine,
> >
> > Thanks for the KIP! There's been a lot of productive discussion so far so
> > I'll try to keep my remarks brief.
> >
> > 1. As far as resetting the active topics for a connector goes, it's noted
> > in the KIP that this can be done for a deleted connector. Can this also
> be
> > done for connectors that were never created to begin with? What would the
> > behavior be in this case? (Can this be clarified in the KIP?)
> >
>
> Indeed, the intention is to keep reset as an independent and idempotent
> method.
> Keep in mind that a tombstone will be written to the topic if the in-memory
> view (of active topics) of the worker that serves the request contains this
> connector.
> This should at least prevent fake reset requests from filling up the topic
> with tombstone messages.
>
>
>
> > 2. What is the motivation for the `topic.tracking.allow.reset` config? Is
> > there any anticipated case where it would be useful to have topic
> tracking
> > enabled but with resets disabled? We could easily add this configuration
> > later if a use case arises, but if we add it 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-17 Thread Konstantine Karantasis
Hey Chris! Thanks for the comments. Answers inline below:

On Fri, Jan 17, 2020 at 11:47 AM Christopher Egerton 
wrote:

> Hi Konstantine,
>
> Thanks for the KIP! There's been a lot of productive discussion so far so
> I'll try to keep my remarks brief.
>
> 1. As far as resetting the active topics for a connector goes, it's noted
> in the KIP that this can be done for a deleted connector. Can this also be
> done for connectors that were never created to begin with? What would the
> behavior be in this case? (Can this be clarified in the KIP?)
>

Indeed, the intention is to keep reset as an independent and idempotent
method.
Keep in mind that a tombstone will be written to the topic if the in-memory
view (of active topics) of the worker that serves the request contains this
connector.
This should at least prevent fake reset requests from filling up the topic
with tombstone messages.



> 2. What is the motivation for the `topic.tracking.allow.reset` config? Is
> there any anticipated case where it would be useful to have topic tracking
> enabled but with resets disabled? We could easily add this configuration
> later if a use case arises, but if we add it now it'll be difficult to
> remove.
>

The motivation is for operators of a Connect cluster to be able to disable
resetting the history of active topics altogether, while allowing at the
same time to view the active sets.


> 3. Nit - the JSON formatting in the value format/value example columns
> under the "Format of the new status record" heading is a little confusing.
> Assuming the top-level value is meant to be an object, it should be wrapped
> in braces ("{" and "}").
>

Good catch. Fixed.


> 4. The KIP focuses heavily on the use of the status topic for storage of
> connector topic information, but presumably we'd also want this information
> to be available in standalone mode. If this is the case, it'd be nice to
> tweak the language to refer explicitly to distributed mode when discussing
> the changes to the status topic and note (probably just in once place) that
> similar functionality will also be added to the standalone worker's
> in-memory status store.
>

It's true that the design is detailed w.r.t. what should happen in the
KafkaStatusBackingStore which is a Kafka-based implementation of the
StatusBackingStore interface. This is intentional because this
implementation influences and informs the semantics of topic tracking. I'd
prefer not to make the language too abstract here. A KIP is not exactly a
standard and KIPs often discuss the impact of implementation in behavior
(this KIP is a good example). But I'm happy to add a note to mention that
these semantics will apply to standalone mode too.


> 5. As far as automatic resets for sink connectors go, I agree with your
> reasoning about the inherent asymmetry between sinks and sources, and with
> the motivation to avoid confusing users by listing no-longer-consumed
> topics in the active topics for a sink connector. I think that this
> asymmetry is worth avoiding a scenario where a connector is reconfigured to
> only consume from topic "foo" but, from a prior configuration, topic "bar"
> is still listed in its active topics.
> I do want to request clarification on the meaning of the phrase "any topics
> no longer consumed" as used under the header "Restarting, reconfiguring or
> deleting a connector". Does this mean that the current set of active topics
> for the connector will be filtered and any that are longer contained in the
> sink connector's "topics" config or matched by its "topics.regex" config
> will be removed, or does it mean that all topics will be removed and then
> the active topics list will be repopulated as records are consumed from new
> topics?
>

The intention was to imply the former. But based on Randall's comment
above, I'm changing the KIP to include a reset parameter in the PUT
/connectors/{name}/config endpoint
In this case, the reset will be a complete reset for both source and sink
connectors. This will help keeping the behavior symmetric between the two
connector types.

Best,
Konstantine


> Cheers,
>
> Chris
>
> On Thu, Jan 16, 2020 at 1:51 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> > Thanks for new comments Randall. Following up with my replies inline
> below.
> > I'll also go ahead and update the KIP with the suggestions that are
> > outstanding right now and post a summary of the changes.
> >
> > On Wed, Jan 15, 2020 at 2:37 PM Randall Hauch  wrote:
> >
> > > My responses are inline:
> > >
> > > On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis <
> > > konstant...@confluent.io> wrote:
> > >
> > > > Hi Randall, Tom and Almog. I'm excited to read your comments. I'll
> > reply
> > > in
> > > > separate emails, in order.
> > > >
> > > > First, to Randall's comments, I'm replying below with a reference to
> > the
> > > > comment number:
> > > >
> > > > 1. Although I can imagine we'd be interested in adding additional
> > > metadata
> 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-17 Thread Christopher Egerton
Hi Konstantine,

Thanks for the KIP! There's been a lot of productive discussion so far so
I'll try to keep my remarks brief.

1. As far as resetting the active topics for a connector goes, it's noted
in the KIP that this can be done for a deleted connector. Can this also be
done for connectors that were never created to begin with? What would the
behavior be in this case? (Can this be clarified in the KIP?)

2. What is the motivation for the `topic.tracking.allow.reset` config? Is
there any anticipated case where it would be useful to have topic tracking
enabled but with resets disabled? We could easily add this configuration
later if a use case arises, but if we add it now it'll be difficult to
remove.

3. Nit - the JSON formatting in the value format/value example columns
under the "Format of the new status record" heading is a little confusing.
Assuming the top-level value is meant to be an object, it should be wrapped
in braces ("{" and "}").

4. The KIP focuses heavily on the use of the status topic for storage of
connector topic information, but presumably we'd also want this information
to be available in standalone mode. If this is the case, it'd be nice to
tweak the language to refer explicitly to distributed mode when discussing
the changes to the status topic and note (probably just in once place) that
similar functionality will also be added to the standalone worker's
in-memory status store.

5. As far as automatic resets for sink connectors go, I agree with your
reasoning about the inherent asymmetry between sinks and sources, and with
the motivation to avoid confusing users by listing no-longer-consumed
topics in the active topics for a sink connector. I think that this
asymmetry is worth avoiding a scenario where a connector is reconfigured to
only consume from topic "foo" but, from a prior configuration, topic "bar"
is still listed in its active topics.
I do want to request clarification on the meaning of the phrase "any topics
no longer consumed" as used under the header "Restarting, reconfiguring or
deleting a connector". Does this mean that the current set of active topics
for the connector will be filtered and any that are longer contained in the
sink connector's "topics" config or matched by its "topics.regex" config
will be removed, or does it mean that all topics will be removed and then
the active topics list will be repopulated as records are consumed from new
topics?

Cheers,

Chris

On Thu, Jan 16, 2020 at 1:51 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Thanks for new comments Randall. Following up with my replies inline below.
> I'll also go ahead and update the KIP with the suggestions that are
> outstanding right now and post a summary of the changes.
>
> On Wed, Jan 15, 2020 at 2:37 PM Randall Hauch  wrote:
>
> > My responses are inline:
> >
> > On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis <
> > konstant...@confluent.io> wrote:
> >
> > > Hi Randall, Tom and Almog. I'm excited to read your comments. I'll
> reply
> > in
> > > separate emails, in order.
> > >
> > > First, to Randall's comments, I'm replying below with a reference to
> the
> > > comment number:
> > >
> > > 1. Although I can imagine we'd be interested in adding additional
> > metadata
> > > in the record value, I didn't see the need for a timestamp in this
> first
> > > draft.
> > > Now that you mention, the way I'd interpret a timestamp in the
> connector
> > > status record value would be as an approximation of since when this
> > > connector has been using this topic.
> > > Happy to add this if we think this info is useful. Of course, accuracy
> of
> > > this information depends on message retention in Kafka and on how long
> > the
> > > workers have been running without a restart, so this might make this
> > > approximation less useful if it gets recomputed from time to time.
> > > To your reference in "Recording active topics" I'll reply below,
> because
> > > that's Tom's question too.
> > >
> >
> > Makes sense that the timestamp in the connector is the (approximate) time
> > that the connector has been using the topic. I do think it's worth adding
> > in the record value (not relying upon Kafka record timestamp).
> >
> > Regarding "message retention", by default Connect creates the status
> topic
> > with compaction but no deletion policy, which means infinite retention.
> > Don't several things become problematic if finite retention is used on
> the
> > status topic, or do we need to worry about this for the active topic
> > records. Do we need to periodically rewrite all of the active topic
> > records? If so, we could just write new records using the original
> > timestamp as originally read by the worker. If the worker does
> periodically
> > (maybe just on task startup) rewrite the active topic records, then we'd
> > have to be sure about the semantics of and interplay with concurrent
> > explicit "reset" calls.
> >
>
> Good point. These topics are configured to have 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-16 Thread Konstantine Karantasis
Thanks for new comments Randall. Following up with my replies inline below.
I'll also go ahead and update the KIP with the suggestions that are
outstanding right now and post a summary of the changes.

On Wed, Jan 15, 2020 at 2:37 PM Randall Hauch  wrote:

> My responses are inline:
>
> On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
> > Hi Randall, Tom and Almog. I'm excited to read your comments. I'll reply
> in
> > separate emails, in order.
> >
> > First, to Randall's comments, I'm replying below with a reference to the
> > comment number:
> >
> > 1. Although I can imagine we'd be interested in adding additional
> metadata
> > in the record value, I didn't see the need for a timestamp in this first
> > draft.
> > Now that you mention, the way I'd interpret a timestamp in the connector
> > status record value would be as an approximation of since when this
> > connector has been using this topic.
> > Happy to add this if we think this info is useful. Of course, accuracy of
> > this information depends on message retention in Kafka and on how long
> the
> > workers have been running without a restart, so this might make this
> > approximation less useful if it gets recomputed from time to time.
> > To your reference in "Recording active topics" I'll reply below, because
> > that's Tom's question too.
> >
>
> Makes sense that the timestamp in the connector is the (approximate) time
> that the connector has been using the topic. I do think it's worth adding
> in the record value (not relying upon Kafka record timestamp).
>
> Regarding "message retention", by default Connect creates the status topic
> with compaction but no deletion policy, which means infinite retention.
> Don't several things become problematic if finite retention is used on the
> status topic, or do we need to worry about this for the active topic
> records. Do we need to periodically rewrite all of the active topic
> records? If so, we could just write new records using the original
> timestamp as originally read by the worker. If the worker does periodically
> (maybe just on task startup) rewrite the active topic records, then we'd
> have to be sure about the semantics of and interplay with concurrent
> explicit "reset" calls.
>

Good point. These topics are configured to have infinite retention. I'll
add the timestamp as type 'long'.


>
> >
> > 2. I'll explain with an example, that maybe is worth adding to the KIP
> > because what's expected to happen might not be as obvious as I thought
> when
> > a new topic is recorded.
> > Let's say we have two workers, W1 and W2, each running two worker tasks
> T11
> > T12 and T21 T22 respectively associated with a connector C1. All tasks
> will
> > run producers that will produce records to the same topic, "test-topic".
> > When the connector starts, both workers track this connector's set of
> > active topics as empty. Given the absence of synchronization (that's
> good)
> > in how this information is recorded and persisted in the status topic,
> all
> > four tasks might race to record status messages:
> >
> > For example:
> >
> > T11, running at worker W1, will send Kafka records with:
> > key: topic-test-topic-connector-C1
> > value: "topic": {  "connector": "some-source",  "task":
> "some-source-TT11",
> >  "name": "test-topic" }
> >
> > and T22, running at worker W2, will send Kafka records with:
> > key: topic-test-topic-connector-C1
> > value: "topic": {  "connector": "some-source",  "task":
> "some-source-TT22",
> >  "name": "test-topic" }
> >
> > (similarly tasks T12 and T21 might send topic status records).
> >
> > These four records (they might not even be four but there's going to be
> at
> > least one) may be written in any order. Because the topic is compacted
> and
> > these records have the same key, eventually only one message will be
> > retained.
> > The task ID of that message will be the ID of the task that wrote last. I
> > can see this being used mostly for troubleshooting.
> >
>
> Thanks for the clarification. Might be good to clarify the language a bit
> more, though I'm not convinced an example is really needed.
>

I'll try to see how they both fit. Sure.


>
>
> >
> > 3. I believe across the whole KIP, when I'm referring to the task
> entity, I
> > imply the worker task. Not the user code that is running as
> implementation
> > of the SourceTask or SinkTask abstract classes. Didn't want to increase
> > complexity by referring to a task as worker task.
> > But I see your point and I'm going to prefer the terms "worker" and
> "worker
> > task" to highlight that it's the framework that is aware of this feature
> > and not the user code.
> >
>
> Thank you.
>

  +1


>
>
> >
> > 4. I assumed that absence of changes to the public API would indicate
> that
> > these interfaces/abstract classes remain unchanged. But definitely it's
> > worth to explicitly mention that.
> >
>
> Thanks!
>
> +1


>
> >
> > 5. That is correct. My 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-16 Thread Konstantine Karantasis
Thanks for the follow up Tom. Replies inline below this time:

On Thu, Jan 16, 2020 at 1:16 AM Tom Bentley  wrote:

> Hi Konstantine,
>
> b) I thought hard but briefly about this. Although there's potential for
> > ambiguity if someone inspects the key on its own, I believe that if we
> > combine this information with what we save in the value, the ambiguity is
> > removed. With respect to whether a connector can override another
> > connector's key, I don't think that's possible for any combination of
> > unique topic name and connector name. I hope I'm not missing anything.
> >
>
> I think it is possible for connectors to have conflicting keys:
>
> * topic-name="foo-connector" and connector-name="bar"
> * topic-name="foo" and connector-name="connector-bar"
>
> would both have the key "topic-foo-connector-connector-bar"
>
>
Indeed. Thanks for the pointer Tom. A collision is not as unlikely as I
initially thought. I see roughly two solutions:
a) Use a special character as a delimiter. I initially wanted to avoid
this, but given that topic names have quite restrictive rules with respect
to their allowed character set, the colon character ':' could be used to
separate the topic from the connector name. Connector names are more
flexible, but restrictions in the topic name will suffice. I suggest
keeping prefix 'topic-' and 'connector-' after the delimiter.
b) Use json to encode the key.

In order to be more consistent with the format of existing keys in the
status topic, I'm suggesting to adopt option (a) - use ':' as a separator -
at the moment. If at some point we decide to start using json for the
record keys of internal Connect topics, we should do it in a more concerted
way and in a separate KIP.



> > c) That means that worker tasks will produce topic status records once
> they
> > detect that the worker they are running on does not include a topic that
> is
> > used by the tasks in the list of active topics for this connector. But
> once
> > the worker adds this topic to the set of active topics for this
> connector,
> > then the worker tasks stop producing those messages. They'll produce one
> > again if the worker stops including that topic in the active topics set
> for
> > some reason. I could improve the wording on the KIP, but I'd also like to
> > know we are on the same page re: the design here.
> >
>
> OK, I understand now. I think it would be helpful to include the example
> you gave in your previous reply.
>

Definitely. Will add the example.



> Many thanks,
>
> Tom
>
>
I've updated the KIP to mention the delimiter and using a metric to track
active topics as a rejected alternative.
Following up with the rest of the changes shortly.

Cheers,
Konstantine



> >
> > Let me know if the above address your questions.
> > Thanks,
> > Konstantine
> >
> >
> >
> > On Wed, Jan 15, 2020 at 12:05 PM Randall Hauch  wrote:
> >
> > > Almog,
> > >
> > > You raise some interesting questions. Comments inline below.
> > >
> > > On Wed, Jan 15, 2020 at 11:19 AM Almog Gavra 
> wrote:
> > >
> > > > Hi Konstantine,
> > > >
> > > > Thanks for the KIP! This is going to make automatic integration with
> > > > Connect much more powerful.
> > > >
> > > > My thoughts are mostly around freshness of the data and being able to
> > > > expose that to users. Riffing on Randall's timestamp question - have
> we
> > > > considered adding some interval at which point a connector will
> > republish
> > > > any topics that it encounters and update the timestamp? That way we
> > have
> > > > some refreshing mechanism that isn't as powerful as the complete
> reset
> > > > (which may not be practical in many scenarios).
> > > >
> > >
> > > My question about recording the timestamp at which each active topic
> > record
> > > were (infrequently) written was more about making a bit more
> information
> > > available given the current design, and whether recording a bit more
> > > information (for very little additional storage cost and no extra
> runtime
> > > cost) may be worth it if in the future we figure out how to use this
> > > information.
> > >
> > > I think it's more complicated to try to record the history of when
> topics
> > > were most recently used, since that requires recording a lot more
> active
> > > topic records than the current proposal. Besides, it's not unexpected
> > that
> > > source and sink connectors sometimes don't use topics for periods of
> > time.
> > > A sink connector only consumes from a topic when there are additional
> > > records to consume, and a source connector only needs to write to a
> topic
> > > when there is information in the upstream system targeted to that
> topic.
> > An
> > > example of the latter is that most database connectors will write to a
> > > topic for a particular table only when rows in that table have changed.
> > >
> > >
> > > > I also agree with Randall's other point (Would it be better to not
> > > > automatically reset connector's active topics when a sink connector
> 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-16 Thread Tom Bentley
Hi Konstantine,

b) I thought hard but briefly about this. Although there's potential for
> ambiguity if someone inspects the key on its own, I believe that if we
> combine this information with what we save in the value, the ambiguity is
> removed. With respect to whether a connector can override another
> connector's key, I don't think that's possible for any combination of
> unique topic name and connector name. I hope I'm not missing anything.
>

I think it is possible for connectors to have conflicting keys:

* topic-name="foo-connector" and connector-name="bar"
* topic-name="foo" and connector-name="connector-bar"

would both have the key "topic-foo-connector-connector-bar"


> c) That means that worker tasks will produce topic status records once they
> detect that the worker they are running on does not include a topic that is
> used by the tasks in the list of active topics for this connector. But once
> the worker adds this topic to the set of active topics for this connector,
> then the worker tasks stop producing those messages. They'll produce one
> again if the worker stops including that topic in the active topics set for
> some reason. I could improve the wording on the KIP, but I'd also like to
> know we are on the same page re: the design here.
>

OK, I understand now. I think it would be helpful to include the example
you gave in your previous reply.

Many thanks,

Tom


>
> Let me know if the above address your questions.
> Thanks,
> Konstantine
>
>
>
> On Wed, Jan 15, 2020 at 12:05 PM Randall Hauch  wrote:
>
> > Almog,
> >
> > You raise some interesting questions. Comments inline below.
> >
> > On Wed, Jan 15, 2020 at 11:19 AM Almog Gavra  wrote:
> >
> > > Hi Konstantine,
> > >
> > > Thanks for the KIP! This is going to make automatic integration with
> > > Connect much more powerful.
> > >
> > > My thoughts are mostly around freshness of the data and being able to
> > > expose that to users. Riffing on Randall's timestamp question - have we
> > > considered adding some interval at which point a connector will
> republish
> > > any topics that it encounters and update the timestamp? That way we
> have
> > > some refreshing mechanism that isn't as powerful as the complete reset
> > > (which may not be practical in many scenarios).
> > >
> >
> > My question about recording the timestamp at which each active topic
> record
> > were (infrequently) written was more about making a bit more information
> > available given the current design, and whether recording a bit more
> > information (for very little additional storage cost and no extra runtime
> > cost) may be worth it if in the future we figure out how to use this
> > information.
> >
> > I think it's more complicated to try to record the history of when topics
> > were most recently used, since that requires recording a lot more active
> > topic records than the current proposal. Besides, it's not unexpected
> that
> > source and sink connectors sometimes don't use topics for periods of
> time.
> > A sink connector only consumes from a topic when there are additional
> > records to consume, and a source connector only needs to write to a topic
> > when there is information in the upstream system targeted to that topic.
> An
> > example of the latter is that most database connectors will write to a
> > topic for a particular table only when rows in that table have changed.
> >
> >
> > > I also agree with Randall's other point (Would it be better to not
> > > automatically reset connector's active topics when a sink connector is
> > > restarted?). I think keeping the behavior as symmetrical between sink
> and
> > > source connectors is a good idea.
> > >
> > > Lastly, with regards to the API, I can imagine it is also pretty useful
> > to
> > > answer the inverse question: "which connectors write to topic X".
> Perhaps
> > > we can achieve this by letting the users compute it and just expose an
> > API
> > > that returns the entire mapping at once (instead of needing to call the
> > > /connectors/{name}/topics endpoint for each connector).
> >
> >
> > It may be worth considering a method such as /topics that would produce a
> > result that is an aggregate of the potentially many
> > /connectors/{name}/topic responses, especially if the result of the
> /topics
> > is an array of the individual responses from /connectors/{name}/topic.
> The
> > benefit of a single request is that the answer to "which connectors use
> > topic X" can be computed using simple tools such as 'jq'. And, if tooling
> > frequently fetches the active topic information for all connectors,
> > providing the aggregate method would reduce the load on the tooling and
> the
> > server. It may also be relatively easy to implement.
> >
> >
> > > Otherwise, looks good to me! Hits the requirements that I had in mind
> on
> > > the nose.
> > > - Almog
> > >
> > > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley 
> wrote:
> > >
> > > > Hi Konstantine,
> > > >
> > > > Thanks for 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-15 Thread Konstantine Karantasis
Hey Almog, thanks for the comments!
Here's my take:

1) I think that an approximate grouping of topics to
highly-active/active/inactive (because a precise one would be too
expensive) seems like something we could leave out of this first version of
topic tracking. Interestingly, as you point out, given a topic of
continuous flow, you may get a similar view by resetting and subsequently
querying the topics endpoint. The way I understand Randall's comment, is
that persisting timestamps would have to do more with knowing approximately
the first time this connector has been using its topics. But given the
implications that topic retention might have, I suggest that we leave
timestamp recording out at this point.

2) As I described in my previous answer above (#6 on my first reply to
Randall's comments), this asymmetry is inherited from the sink connector's
configuration. I'm ok not resetting the topics for both upon
reconfiguration, since this will result in simpler code. But I'd like to
know that we are ok with a sink connector showing topics in its active set
that are not in its current configuration (unless a reset request is
issued).

3) A similar idea crossed my mind while I was thinking how the
"/connectors" endpoint evolved with KIP-465 to show a roll up of the status
of the tasks of all the connectors. However, here what you describe would
probably require an additional top-level "/topics" endpoint and a more
complex filtering based on permissions. I'd suggest punting this feature,
unless people think that is a really nice to have. In the meantime, as you
mention, it is something that can be constructed with consecutive queries
after an applications gets the list of connectors running in the Connect
cluster.

Cheers,
Konstantine

On Wed, Jan 15, 2020 at 2:41 PM Randall Hauch  wrote:

> On Wed, Jan 15, 2020 at 4:36 PM Randall Hauch  wrote:
>
> > On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis <
> > konstant...@confluent.io> wrote:
> >
> >>
> >> 9. I assumed that partitioning is implied by default, because there's no
> >> requirement for complete ordering of topic status records. But I'll add
> >> this fact as a separate bullet. The status.storage.topic is already a
> >> partitioned topic.
> >>
> >
> > Agreed. I think it'd be sufficient to simply mention that partition will
> > be chosen based upon the active topic records' keys, ensuring that all
> > active topic records for the same connector will be written to the same
> > partition and will be totally ordered.
> >
>
> Well, my previous statement is not quite right. All topic records for the
> same *connector and topic* will be written to the same partition and will
> be totally ordered. But as you pointed out, it doesn't really matter, other
> than that this feature will work with any # of partitions. The new bullet
> you described would be sufficient. :-D
>
>
> >
> >>
> >> I'm following up with the rest of the comments, shortly.
> >> Thanks,
> >> Konstantine
> >>
> >>
> >> On Wed, Jan 15, 2020 at 9:19 AM Almog Gavra  wrote:
> >>
> >> > Hi Konstantine,
> >> >
> >> > Thanks for the KIP! This is going to make automatic integration with
> >> > Connect much more powerful.
> >> >
> >> > My thoughts are mostly around freshness of the data and being able to
> >> > expose that to users. Riffing on Randall's timestamp question - have
> we
> >> > considered adding some interval at which point a connector will
> >> republish
> >> > any topics that it encounters and update the timestamp? That way we
> have
> >> > some refreshing mechanism that isn't as powerful as the complete reset
> >> > (which may not be practical in many scenarios).
> >> >
> >> > I also agree with Randall's other point (Would it be better to not
> >> > automatically reset connector's active topics when a sink connector is
> >> > restarted?). I think keeping the behavior as symmetrical between sink
> >> and
> >> > source connectors is a good idea.
> >> >
> >> > Lastly, with regards to the API, I can imagine it is also pretty
> useful
> >> to
> >> > answer the inverse question: "which connectors write to topic X".
> >> Perhaps
> >> > we can achieve this by letting the users compute it and just expose an
> >> API
> >> > that returns the entire mapping at once (instead of needing to call
> the
> >> > /connectors/{name}/topics endpoint for each connector).
> >> >
> >> > Otherwise, looks good to me! Hits the requirements that I had in mind
> on
> >> > the nose.
> >> > - Almog
> >> >
> >> > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley 
> >> wrote:
> >> >
> >> > > Hi Konstantine,
> >> > >
> >> > > Thanks for the KIP, I can see how it could be useful.
> >> > >
> >> > > a) Did you consider using a metric for this? I don't think it would
> >> > satisfy
> >> > > all the use cases you have in mind, but you could mention it in the
> >> > > rejected alternatives.
> >> > >
> >> > > b) If the topic name contains the string "-connector" then the key
> >> format
> >> > > is ambiguous. This isn't 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-15 Thread Randall Hauch
On Wed, Jan 15, 2020 at 4:36 PM Randall Hauch  wrote:

> On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
>>
>> 9. I assumed that partitioning is implied by default, because there's no
>> requirement for complete ordering of topic status records. But I'll add
>> this fact as a separate bullet. The status.storage.topic is already a
>> partitioned topic.
>>
>
> Agreed. I think it'd be sufficient to simply mention that partition will
> be chosen based upon the active topic records' keys, ensuring that all
> active topic records for the same connector will be written to the same
> partition and will be totally ordered.
>

Well, my previous statement is not quite right. All topic records for the
same *connector and topic* will be written to the same partition and will
be totally ordered. But as you pointed out, it doesn't really matter, other
than that this feature will work with any # of partitions. The new bullet
you described would be sufficient. :-D


>
>>
>> I'm following up with the rest of the comments, shortly.
>> Thanks,
>> Konstantine
>>
>>
>> On Wed, Jan 15, 2020 at 9:19 AM Almog Gavra  wrote:
>>
>> > Hi Konstantine,
>> >
>> > Thanks for the KIP! This is going to make automatic integration with
>> > Connect much more powerful.
>> >
>> > My thoughts are mostly around freshness of the data and being able to
>> > expose that to users. Riffing on Randall's timestamp question - have we
>> > considered adding some interval at which point a connector will
>> republish
>> > any topics that it encounters and update the timestamp? That way we have
>> > some refreshing mechanism that isn't as powerful as the complete reset
>> > (which may not be practical in many scenarios).
>> >
>> > I also agree with Randall's other point (Would it be better to not
>> > automatically reset connector's active topics when a sink connector is
>> > restarted?). I think keeping the behavior as symmetrical between sink
>> and
>> > source connectors is a good idea.
>> >
>> > Lastly, with regards to the API, I can imagine it is also pretty useful
>> to
>> > answer the inverse question: "which connectors write to topic X".
>> Perhaps
>> > we can achieve this by letting the users compute it and just expose an
>> API
>> > that returns the entire mapping at once (instead of needing to call the
>> > /connectors/{name}/topics endpoint for each connector).
>> >
>> > Otherwise, looks good to me! Hits the requirements that I had in mind on
>> > the nose.
>> > - Almog
>> >
>> > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley 
>> wrote:
>> >
>> > > Hi Konstantine,
>> > >
>> > > Thanks for the KIP, I can see how it could be useful.
>> > >
>> > > a) Did you consider using a metric for this? I don't think it would
>> > satisfy
>> > > all the use cases you have in mind, but you could mention it in the
>> > > rejected alternatives.
>> > >
>> > > b) If the topic name contains the string "-connector" then the key
>> format
>> > > is ambiguous. This isn't necessarily fatal because the value will
>> > > disambiguate, but it could be misleading. Any reason not to just use a
>> > JSON
>> > > key, and simplify the value?
>> > >
>> > > c) I didn't understand this part: "As soon as a worker detects the
>> > addition
>> > > of a topic to a connector's set of active topics, the worker will
>> cease
>> > to
>> > > post update messages to the status.storage.topic for that connector.
>> ".
>> > I'm
>> > > sure I've overlooking something but why is this necessary? Is this
>> were
>> > the
>> > > task id in the value is used?
>> > >
>> > > Thanks again,
>> > >
>> > > Tom
>> > >
>> > > On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch 
>> wrote:
>> > >
>> > > > Oh, one more thing:
>> > > >
>> > > > 9. There's no mention of how the status topic is partitioned, or how
>> > > > partitioning will be used by the new topic records. The KIP should
>> > > probably
>> > > > outline this for clarity and completeness.
>> > > >
>> > > > Best regards,
>> > > >
>> > > > Randall
>> > > >
>> > > > On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch 
>> > wrote:
>> > > >
>> > > > > Thanks, Konstantine. Overall, this KIP looks interesting and
>> really
>> > > > > useful, and for the most part is spot on. I do have a number of
>> > > > > questions/comments about specifics:
>> > > > >
>> > > > >1. The topic records have a value that includes the connector
>> > name,
>> > > > >task number that last reported the topic is used, and the topic
>> > > name.
>> > > > >There's no mention of record timestamps, but I wonder if it'd
>> be
>> > > > useful to
>> > > > >record this. One challenge might be that a connector does not
>> > write
>> > > > to a
>> > > > >topic for a while or the task remains running for long periods
>> of
>> > > > time and
>> > > > >therefore the worker doesn't record that this topic has been
>> newly
>> > > > written
>> > > > >to since it the task was restarted. IOW, the semantics of the
>> > > > timestamp may

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-15 Thread Randall Hauch
My responses are inline:

On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi Randall, Tom and Almog. I'm excited to read your comments. I'll reply in
> separate emails, in order.
>
> First, to Randall's comments, I'm replying below with a reference to the
> comment number:
>
> 1. Although I can imagine we'd be interested in adding additional metadata
> in the record value, I didn't see the need for a timestamp in this first
> draft.
> Now that you mention, the way I'd interpret a timestamp in the connector
> status record value would be as an approximation of since when this
> connector has been using this topic.
> Happy to add this if we think this info is useful. Of course, accuracy of
> this information depends on message retention in Kafka and on how long the
> workers have been running without a restart, so this might make this
> approximation less useful if it gets recomputed from time to time.
> To your reference in "Recording active topics" I'll reply below, because
> that's Tom's question too.
>

Makes sense that the timestamp in the connector is the (approximate) time
that the connector has been using the topic. I do think it's worth adding
in the record value (not relying upon Kafka record timestamp).

Regarding "message retention", by default Connect creates the status topic
with compaction but no deletion policy, which means infinite retention.
Don't several things become problematic if finite retention is used on the
status topic, or do we need to worry about this for the active topic
records. Do we need to periodically rewrite all of the active topic
records? If so, we could just write new records using the original
timestamp as originally read by the worker. If the worker does periodically
(maybe just on task startup) rewrite the active topic records, then we'd
have to be sure about the semantics of and interplay with concurrent
explicit "reset" calls.


>
> 2. I'll explain with an example, that maybe is worth adding to the KIP
> because what's expected to happen might not be as obvious as I thought when
> a new topic is recorded.
> Let's say we have two workers, W1 and W2, each running two worker tasks T11
> T12 and T21 T22 respectively associated with a connector C1. All tasks will
> run producers that will produce records to the same topic, "test-topic".
> When the connector starts, both workers track this connector's set of
> active topics as empty. Given the absence of synchronization (that's good)
> in how this information is recorded and persisted in the status topic, all
> four tasks might race to record status messages:
>
> For example:
>
> T11, running at worker W1, will send Kafka records with:
> key: topic-test-topic-connector-C1
> value: "topic": {  "connector": "some-source",  "task": "some-source-TT11",
>  "name": "test-topic" }
>
> and T22, running at worker W2, will send Kafka records with:
> key: topic-test-topic-connector-C1
> value: "topic": {  "connector": "some-source",  "task": "some-source-TT22",
>  "name": "test-topic" }
>
> (similarly tasks T12 and T21 might send topic status records).
>
> These four records (they might not even be four but there's going to be at
> least one) may be written in any order. Because the topic is compacted and
> these records have the same key, eventually only one message will be
> retained.
> The task ID of that message will be the ID of the task that wrote last. I
> can see this being used mostly for troubleshooting.
>

Thanks for the clarification. Might be good to clarify the language a bit
more, though I'm not convinced an example is really needed.


>
> 3. I believe across the whole KIP, when I'm referring to the task entity, I
> imply the worker task. Not the user code that is running as implementation
> of the SourceTask or SinkTask abstract classes. Didn't want to increase
> complexity by referring to a task as worker task.
> But I see your point and I'm going to prefer the terms "worker" and "worker
> task" to highlight that it's the framework that is aware of this feature
> and not the user code.
>

Thank you.


>
> 4. I assumed that absence of changes to the public API would indicate that
> these interfaces/abstract classes remain unchanged. But definitely it's
> worth to explicitly mention that.
>

Thanks!


>
> 5. That is correct. My intention is to make reset work well with the
> streaming programming model. Resetting (which btw is not mandatory) means
> that you are cleaning the slate for a connector that is currently running,
> and its currently active topics will soon be populated from scratch because
> new records will be produced or consumed.
> But resetting is not required. I see it more like a useful operation, in
> case users want to clean the active topics history, without having to
> delete a connector, since delete has further implications in the
> connector's progress tracking.
>

I do think it's worth trying to clarify in the document what happens when
active 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-15 Thread Konstantine Karantasis
Hi Tom,

here are my replies to your comments:

a) Interesting point. It's worth indeed adding it in the "Rejected
Alternatives" sections. I did not consider it as an implementation option.
Connect has already a REST API that, as you note, seems a more natural
choice, especially since its already in place and this feature actually
fits well under the connector endpoint namespace. See KIP-495 for an
interesting discussion on doubling down to the initial design decision to
use the REST API instead of jmx for similar things.

b) I thought hard but briefly about this. Although there's potential for
ambiguity if someone inspects the key on its own, I believe that if we
combine this information with what we save in the value, the ambiguity is
removed. With respect to whether a connector can override another
connector's key, I don't think that's possible for any combination of
unique topic name and connector name. I hope I'm not missing anything.

c) That means that worker tasks will produce topic status records once they
detect that the worker they are running on does not include a topic that is
used by the tasks in the list of active topics for this connector. But once
the worker adds this topic to the set of active topics for this connector,
then the worker tasks stop producing those messages. They'll produce one
again if the worker stops including that topic in the active topics set for
some reason. I could improve the wording on the KIP, but I'd also like to
know we are on the same page re: the design here.

Let me know if the above address your questions.
Thanks,
Konstantine



On Wed, Jan 15, 2020 at 12:05 PM Randall Hauch  wrote:

> Almog,
>
> You raise some interesting questions. Comments inline below.
>
> On Wed, Jan 15, 2020 at 11:19 AM Almog Gavra  wrote:
>
> > Hi Konstantine,
> >
> > Thanks for the KIP! This is going to make automatic integration with
> > Connect much more powerful.
> >
> > My thoughts are mostly around freshness of the data and being able to
> > expose that to users. Riffing on Randall's timestamp question - have we
> > considered adding some interval at which point a connector will republish
> > any topics that it encounters and update the timestamp? That way we have
> > some refreshing mechanism that isn't as powerful as the complete reset
> > (which may not be practical in many scenarios).
> >
>
> My question about recording the timestamp at which each active topic record
> were (infrequently) written was more about making a bit more information
> available given the current design, and whether recording a bit more
> information (for very little additional storage cost and no extra runtime
> cost) may be worth it if in the future we figure out how to use this
> information.
>
> I think it's more complicated to try to record the history of when topics
> were most recently used, since that requires recording a lot more active
> topic records than the current proposal. Besides, it's not unexpected that
> source and sink connectors sometimes don't use topics for periods of time.
> A sink connector only consumes from a topic when there are additional
> records to consume, and a source connector only needs to write to a topic
> when there is information in the upstream system targeted to that topic. An
> example of the latter is that most database connectors will write to a
> topic for a particular table only when rows in that table have changed.
>
>
> > I also agree with Randall's other point (Would it be better to not
> > automatically reset connector's active topics when a sink connector is
> > restarted?). I think keeping the behavior as symmetrical between sink and
> > source connectors is a good idea.
> >
> > Lastly, with regards to the API, I can imagine it is also pretty useful
> to
> > answer the inverse question: "which connectors write to topic X". Perhaps
> > we can achieve this by letting the users compute it and just expose an
> API
> > that returns the entire mapping at once (instead of needing to call the
> > /connectors/{name}/topics endpoint for each connector).
>
>
> It may be worth considering a method such as /topics that would produce a
> result that is an aggregate of the potentially many
> /connectors/{name}/topic responses, especially if the result of the /topics
> is an array of the individual responses from /connectors/{name}/topic. The
> benefit of a single request is that the answer to "which connectors use
> topic X" can be computed using simple tools such as 'jq'. And, if tooling
> frequently fetches the active topic information for all connectors,
> providing the aggregate method would reduce the load on the tooling and the
> server. It may also be relatively easy to implement.
>
>
> > Otherwise, looks good to me! Hits the requirements that I had in mind on
> > the nose.
> > - Almog
> >
> > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley  wrote:
> >
> > > Hi Konstantine,
> > >
> > > Thanks for the KIP, I can see how it could be useful.
> > >
> > > a) Did 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-15 Thread Konstantine Karantasis
Hi Randall, Tom and Almog. I'm excited to read your comments. I'll reply in
separate emails, in order.

First, to Randall's comments, I'm replying below with a reference to the
comment number:

1. Although I can imagine we'd be interested in adding additional metadata
in the record value, I didn't see the need for a timestamp in this first
draft.
Now that you mention, the way I'd interpret a timestamp in the connector
status record value would be as an approximation of since when this
connector has been using this topic.
Happy to add this if we think this info is useful. Of course, accuracy of
this information depends on message retention in Kafka and on how long the
workers have been running without a restart, so this might make this
approximation less useful if it gets recomputed from time to time.
To your reference in "Recording active topics" I'll reply below, because
that's Tom's question too.

2. I'll explain with an example, that maybe is worth adding to the KIP
because what's expected to happen might not be as obvious as I thought when
a new topic is recorded.
Let's say we have two workers, W1 and W2, each running two worker tasks T11
T12 and T21 T22 respectively associated with a connector C1. All tasks will
run producers that will produce records to the same topic, "test-topic".
When the connector starts, both workers track this connector's set of
active topics as empty. Given the absence of synchronization (that's good)
in how this information is recorded and persisted in the status topic, all
four tasks might race to record status messages:

For example:

T11, running at worker W1, will send Kafka records with:
key: topic-test-topic-connector-C1
value: "topic": {  "connector": "some-source",  "task": "some-source-TT11",
 "name": "test-topic" }

and T22, running at worker W2, will send Kafka records with:
key: topic-test-topic-connector-C1
value: "topic": {  "connector": "some-source",  "task": "some-source-TT22",
 "name": "test-topic" }

(similarly tasks T12 and T21 might send topic status records).

These four records (they might not even be four but there's going to be at
least one) may be written in any order. Because the topic is compacted and
these records have the same key, eventually only one message will be
retained.
The task ID of that message will be the ID of the task that wrote last. I
can see this being used mostly for troubleshooting.

3. I believe across the whole KIP, when I'm referring to the task entity, I
imply the worker task. Not the user code that is running as implementation
of the SourceTask or SinkTask abstract classes. Didn't want to increase
complexity by referring to a task as worker task.
But I see your point and I'm going to prefer the terms "worker" and "worker
task" to highlight that it's the framework that is aware of this feature
and not the user code.

4. I assumed that absence of changes to the public API would indicate that
these interfaces/abstract classes remain unchanged. But definitely it's
worth to explicitly mention that.

5. That is correct. My intention is to make reset work well with the
streaming programming model. Resetting (which btw is not mandatory) means
that you are cleaning the slate for a connector that is currently running,
and its currently active topics will soon be populated from scratch because
new records will be produced or consumed.
But resetting is not required. I see it more like a useful operation, in
case users want to clean the active topics history, without having to
delete a connector, since delete has further implications in the
connector's progress tracking.

6. I fixed the typo - thanks! I'm very much in favor of preserving symmetry
between the two connector types. This has definitely more long term
benefits and may help to avoid confusion. However, the asymmetry is
inherited here by the asymmetry that exists today between source and sink
connectors.
Source connector don't list topics in their configurations but sink
connectors do. So, if a user reconfigures a sink connector with a different
set of topics, if we don't reset the topics based on the new configs (and
my thought here was to match the new configuration with the set of active
topics), the old topics, currently not listed in the connectors
configuration, will keep showing up as active topics. The user will have to
explicitly reset the active topics after reconfiguring to avoid this. If
there's consensus that preserving this asymmetry is worse than having to
reset the active topics, I'm happy to change this in the KIP.

7. What I try to avoid here is the following situation: For some reason (a
sequence of failures to write tombstones to the status topic), stale topic
status records remain in that topic even after a connector has been
deleted. Requiring to restart a connector with the same name just to apply
a follow up reset of active topics doesn't seem necessary. I like the idea
of decoupling connector existence from the maintenance of the status topic.
Of 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-15 Thread Randall Hauch
Almog,

You raise some interesting questions. Comments inline below.

On Wed, Jan 15, 2020 at 11:19 AM Almog Gavra  wrote:

> Hi Konstantine,
>
> Thanks for the KIP! This is going to make automatic integration with
> Connect much more powerful.
>
> My thoughts are mostly around freshness of the data and being able to
> expose that to users. Riffing on Randall's timestamp question - have we
> considered adding some interval at which point a connector will republish
> any topics that it encounters and update the timestamp? That way we have
> some refreshing mechanism that isn't as powerful as the complete reset
> (which may not be practical in many scenarios).
>

My question about recording the timestamp at which each active topic record
were (infrequently) written was more about making a bit more information
available given the current design, and whether recording a bit more
information (for very little additional storage cost and no extra runtime
cost) may be worth it if in the future we figure out how to use this
information.

I think it's more complicated to try to record the history of when topics
were most recently used, since that requires recording a lot more active
topic records than the current proposal. Besides, it's not unexpected that
source and sink connectors sometimes don't use topics for periods of time.
A sink connector only consumes from a topic when there are additional
records to consume, and a source connector only needs to write to a topic
when there is information in the upstream system targeted to that topic. An
example of the latter is that most database connectors will write to a
topic for a particular table only when rows in that table have changed.


> I also agree with Randall's other point (Would it be better to not
> automatically reset connector's active topics when a sink connector is
> restarted?). I think keeping the behavior as symmetrical between sink and
> source connectors is a good idea.
>
> Lastly, with regards to the API, I can imagine it is also pretty useful to
> answer the inverse question: "which connectors write to topic X". Perhaps
> we can achieve this by letting the users compute it and just expose an API
> that returns the entire mapping at once (instead of needing to call the
> /connectors/{name}/topics endpoint for each connector).


It may be worth considering a method such as /topics that would produce a
result that is an aggregate of the potentially many
/connectors/{name}/topic responses, especially if the result of the /topics
is an array of the individual responses from /connectors/{name}/topic. The
benefit of a single request is that the answer to "which connectors use
topic X" can be computed using simple tools such as 'jq'. And, if tooling
frequently fetches the active topic information for all connectors,
providing the aggregate method would reduce the load on the tooling and the
server. It may also be relatively easy to implement.


> Otherwise, looks good to me! Hits the requirements that I had in mind on
> the nose.
> - Almog
>
> On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley  wrote:
>
> > Hi Konstantine,
> >
> > Thanks for the KIP, I can see how it could be useful.
> >
> > a) Did you consider using a metric for this? I don't think it would
> satisfy
> > all the use cases you have in mind, but you could mention it in the
> > rejected alternatives.
> >
> > b) If the topic name contains the string "-connector" then the key format
> > is ambiguous. This isn't necessarily fatal because the value will
> > disambiguate, but it could be misleading. Any reason not to just use a
> JSON
> > key, and simplify the value?
> >
> > c) I didn't understand this part: "As soon as a worker detects the
> addition
> > of a topic to a connector's set of active topics, the worker will cease
> to
> > post update messages to the status.storage.topic for that connector. ".
> I'm
> > sure I've overlooking something but why is this necessary? Is this were
> the
> > task id in the value is used?
> >
> > Thanks again,
> >
> > Tom
> >
> > On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch  wrote:
> >
> > > Oh, one more thing:
> > >
> > > 9. There's no mention of how the status topic is partitioned, or how
> > > partitioning will be used by the new topic records. The KIP should
> > probably
> > > outline this for clarity and completeness.
> > >
> > > Best regards,
> > >
> > > Randall
> > >
> > > On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch 
> wrote:
> > >
> > > > Thanks, Konstantine. Overall, this KIP looks interesting and really
> > > > useful, and for the most part is spot on. I do have a number of
> > > > questions/comments about specifics:
> > > >
> > > >1. The topic records have a value that includes the connector
> name,
> > > >task number that last reported the topic is used, and the topic
> > name.
> > > >There's no mention of record timestamps, but I wonder if it'd be
> > > useful to
> > > >record this. One challenge might be that a connector does not
> 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-15 Thread Almog Gavra
Hi Konstantine,

Thanks for the KIP! This is going to make automatic integration with
Connect much more powerful.

My thoughts are mostly around freshness of the data and being able to
expose that to users. Riffing on Randall's timestamp question - have we
considered adding some interval at which point a connector will republish
any topics that it encounters and update the timestamp? That way we have
some refreshing mechanism that isn't as powerful as the complete reset
(which may not be practical in many scenarios).

I also agree with Randall's other point (Would it be better to not
automatically reset connector's active topics when a sink connector is
restarted?). I think keeping the behavior as symmetrical between sink and
source connectors is a good idea.

Lastly, with regards to the API, I can imagine it is also pretty useful to
answer the inverse question: "which connectors write to topic X". Perhaps
we can achieve this by letting the users compute it and just expose an API
that returns the entire mapping at once (instead of needing to call the
/connectors/{name}/topics endpoint for each connector).

Otherwise, looks good to me! Hits the requirements that I had in mind on
the nose.
- Almog

On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley  wrote:

> Hi Konstantine,
>
> Thanks for the KIP, I can see how it could be useful.
>
> a) Did you consider using a metric for this? I don't think it would satisfy
> all the use cases you have in mind, but you could mention it in the
> rejected alternatives.
>
> b) If the topic name contains the string "-connector" then the key format
> is ambiguous. This isn't necessarily fatal because the value will
> disambiguate, but it could be misleading. Any reason not to just use a JSON
> key, and simplify the value?
>
> c) I didn't understand this part: "As soon as a worker detects the addition
> of a topic to a connector's set of active topics, the worker will cease to
> post update messages to the status.storage.topic for that connector. ". I'm
> sure I've overlooking something but why is this necessary? Is this were the
> task id in the value is used?
>
> Thanks again,
>
> Tom
>
> On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch  wrote:
>
> > Oh, one more thing:
> >
> > 9. There's no mention of how the status topic is partitioned, or how
> > partitioning will be used by the new topic records. The KIP should
> probably
> > outline this for clarity and completeness.
> >
> > Best regards,
> >
> > Randall
> >
> > On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch  wrote:
> >
> > > Thanks, Konstantine. Overall, this KIP looks interesting and really
> > > useful, and for the most part is spot on. I do have a number of
> > > questions/comments about specifics:
> > >
> > >1. The topic records have a value that includes the connector name,
> > >task number that last reported the topic is used, and the topic
> name.
> > >There's no mention of record timestamps, but I wonder if it'd be
> > useful to
> > >record this. One challenge might be that a connector does not write
> > to a
> > >topic for a while or the task remains running for long periods of
> > time and
> > >therefore the worker doesn't record that this topic has been newly
> > written
> > >to since it the task was restarted. IOW, the semantics of the
> > timestamp may
> > >be a bit murky. Have you thought about recording the timestamp, and
> > if so
> > >what are the pros and cons?
> > >- The "Recording active topics" section says the following:
> > >   "As soon as a worker detects the addition of a topic to a
> > >   connector's set of active topics, all the connector's tasks that
> > inspect
> > >   source or sink records will cease to post update messages to the
> > >   status.storage.topic."
> > >   This probably means the timestamp won't be very useful.
> > >2. The KIP says "the Kafka record value stores the ID of the task
> that
> > >succeeded to store a topic status record last." However, this is a
> bit
> > >unclear: is it really storing the last task that successfully wrote
> > to that
> > >topic (as this would require very frequent writes to this topic), or
> > is it
> > >more that this is the task that was last *recorded* as having
> written
> > >to the topic? (Here, "recorded" could be a bit of a gray area, since
> > this
> > >would depend on the how the worker periodically records this
> > information.)
> > >Any kind of clarity here might be helpful.
> > >3. In the "Recording active topics" section (and the surrounding
> > >sections), the "task" is used ambiguously. For example, "when its
> > tasks
> > >start processing their first records ... these tasks will start
> > inspecting
> > >which is the Kafka topic of each of these records". IIUC, the first
> > "task"
> > >mentioned is the connector's task, and the second is the worker's
> > task. Do
> > >we need to distinguish this more clearly?
> > >4. 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-15 Thread Tom Bentley
Hi Konstantine,

Thanks for the KIP, I can see how it could be useful.

a) Did you consider using a metric for this? I don't think it would satisfy
all the use cases you have in mind, but you could mention it in the
rejected alternatives.

b) If the topic name contains the string "-connector" then the key format
is ambiguous. This isn't necessarily fatal because the value will
disambiguate, but it could be misleading. Any reason not to just use a JSON
key, and simplify the value?

c) I didn't understand this part: "As soon as a worker detects the addition
of a topic to a connector's set of active topics, the worker will cease to
post update messages to the status.storage.topic for that connector. ". I'm
sure I've overlooking something but why is this necessary? Is this were the
task id in the value is used?

Thanks again,

Tom

On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch  wrote:

> Oh, one more thing:
>
> 9. There's no mention of how the status topic is partitioned, or how
> partitioning will be used by the new topic records. The KIP should probably
> outline this for clarity and completeness.
>
> Best regards,
>
> Randall
>
> On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch  wrote:
>
> > Thanks, Konstantine. Overall, this KIP looks interesting and really
> > useful, and for the most part is spot on. I do have a number of
> > questions/comments about specifics:
> >
> >1. The topic records have a value that includes the connector name,
> >task number that last reported the topic is used, and the topic name.
> >There's no mention of record timestamps, but I wonder if it'd be
> useful to
> >record this. One challenge might be that a connector does not write
> to a
> >topic for a while or the task remains running for long periods of
> time and
> >therefore the worker doesn't record that this topic has been newly
> written
> >to since it the task was restarted. IOW, the semantics of the
> timestamp may
> >be a bit murky. Have you thought about recording the timestamp, and
> if so
> >what are the pros and cons?
> >- The "Recording active topics" section says the following:
> >   "As soon as a worker detects the addition of a topic to a
> >   connector's set of active topics, all the connector's tasks that
> inspect
> >   source or sink records will cease to post update messages to the
> >   status.storage.topic."
> >   This probably means the timestamp won't be very useful.
> >2. The KIP says "the Kafka record value stores the ID of the task that
> >succeeded to store a topic status record last." However, this is a bit
> >unclear: is it really storing the last task that successfully wrote
> to that
> >topic (as this would require very frequent writes to this topic), or
> is it
> >more that this is the task that was last *recorded* as having written
> >to the topic? (Here, "recorded" could be a bit of a gray area, since
> this
> >would depend on the how the worker periodically records this
> information.)
> >Any kind of clarity here might be helpful.
> >3. In the "Recording active topics" section (and the surrounding
> >sections), the "task" is used ambiguously. For example, "when its
> tasks
> >start processing their first records ... these tasks will start
> inspecting
> >which is the Kafka topic of each of these records". IIUC, the first
> "task"
> >mentioned is the connector's task, and the second is the worker's
> task. Do
> >we need to distinguish this more clearly?
> >4. Maybe I missed it, but does this KIP explicitly say that the
> >Connector API is unchanged? It's probably worth pointing out to help
> >assuage any concerns that connector implementations have to change to
> make
> >use of this feature.
> >5. In the "Resetting a connector's set of active topics" section the
> >behavior is not exactly clear. Consider a user running connector "A",
> the
> >connector has been fully started and is processing records, and the
> worker
> >has recorded topic usage records. Then the user resets the active
> topics
> >for connector A while the connector is still running? If the connector
> >writes to no new topics, before the tasks are rebalanced then is it
> correct
> >that Connect would report no active topics? And after the tasks are
> >rebalance, will the worker record any topics used by connector A?
> >6. In the "Restaring" (misspelled) section: "Reconfiguring a source
> >connector has also no altering effect for a source connector.
> However, when
> >reconfiguring a sink connector if the new configuration no longer
> includes
> >any of the previously tracked topics, these topics will be removed
> from the
> >set of active topics for this sink connector by appending tombstone
> >messages appropriately after the reconfiguration of the connector."
> Would
> >it be better to not automatically reset connector's active topics

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-14 Thread Randall Hauch
Oh, one more thing:

9. There's no mention of how the status topic is partitioned, or how
partitioning will be used by the new topic records. The KIP should probably
outline this for clarity and completeness.

Best regards,

Randall

On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch  wrote:

> Thanks, Konstantine. Overall, this KIP looks interesting and really
> useful, and for the most part is spot on. I do have a number of
> questions/comments about specifics:
>
>1. The topic records have a value that includes the connector name,
>task number that last reported the topic is used, and the topic name.
>There's no mention of record timestamps, but I wonder if it'd be useful to
>record this. One challenge might be that a connector does not write to a
>topic for a while or the task remains running for long periods of time and
>therefore the worker doesn't record that this topic has been newly written
>to since it the task was restarted. IOW, the semantics of the timestamp may
>be a bit murky. Have you thought about recording the timestamp, and if so
>what are the pros and cons?
>- The "Recording active topics" section says the following:
>   "As soon as a worker detects the addition of a topic to a
>   connector's set of active topics, all the connector's tasks that inspect
>   source or sink records will cease to post update messages to the
>   status.storage.topic."
>   This probably means the timestamp won't be very useful.
>2. The KIP says "the Kafka record value stores the ID of the task that
>succeeded to store a topic status record last." However, this is a bit
>unclear: is it really storing the last task that successfully wrote to that
>topic (as this would require very frequent writes to this topic), or is it
>more that this is the task that was last *recorded* as having written
>to the topic? (Here, "recorded" could be a bit of a gray area, since this
>would depend on the how the worker periodically records this information.)
>Any kind of clarity here might be helpful.
>3. In the "Recording active topics" section (and the surrounding
>sections), the "task" is used ambiguously. For example, "when its tasks
>start processing their first records ... these tasks will start inspecting
>which is the Kafka topic of each of these records". IIUC, the first "task"
>mentioned is the connector's task, and the second is the worker's task. Do
>we need to distinguish this more clearly?
>4. Maybe I missed it, but does this KIP explicitly say that the
>Connector API is unchanged? It's probably worth pointing out to help
>assuage any concerns that connector implementations have to change to make
>use of this feature.
>5. In the "Resetting a connector's set of active topics" section the
>behavior is not exactly clear. Consider a user running connector "A", the
>connector has been fully started and is processing records, and the worker
>has recorded topic usage records. Then the user resets the active topics
>for connector A while the connector is still running? If the connector
>writes to no new topics, before the tasks are rebalanced then is it correct
>that Connect would report no active topics? And after the tasks are
>rebalance, will the worker record any topics used by connector A?
>6. In the "Restaring" (misspelled) section: "Reconfiguring a source
>connector has also no altering effect for a source connector. However, when
>reconfiguring a sink connector if the new configuration no longer includes
>any of the previously tracked topics, these topics will be removed from the
>set of active topics for this sink connector by appending tombstone
>messages appropriately after the reconfiguration of the connector." Would
>it be better to not automatically reset connector's active topics when a
>sink connector is restarted? Isn't that more consistent with the
>"Resetting" behavior and the goals at the top of the KIP: "it'd be useful
>for users, operators and applications to know which are the topics that a
>connector has used since it was first created"?
>7. The `PUT /connectors/{name}/topics/reset` endpoint "this request
>can be reapplied after the deletion of the connector". IOW, even though
>connector with that name doesn't exist, we can still make this request? How
>does this compare with other methods such as "status"?
>8. What are the security implications of this proposal?
>
> As you can see, most of these can probably be addressed without much work.
>
> Best regards,
>
> Randall
>
> On Mon, Jan 13, 2020 at 11:05 PM Konstantine Karantasis <
> konstant...@confluent.io> wrote:
>
>> Hi all.
>>
>> I just posted KIP-558: Track the set of actively used topics by connectors
>> in Kafka Connect
>>
>> Wiki link here:
>>
>> 

Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect

2020-01-14 Thread Randall Hauch
Thanks, Konstantine. Overall, this KIP looks interesting and really useful,
and for the most part is spot on. I do have a number of questions/comments
about specifics:

   1. The topic records have a value that includes the connector name, task
   number that last reported the topic is used, and the topic name. There's no
   mention of record timestamps, but I wonder if it'd be useful to record
   this. One challenge might be that a connector does not write to a topic for
   a while or the task remains running for long periods of time and therefore
   the worker doesn't record that this topic has been newly written to since
   it the task was restarted. IOW, the semantics of the timestamp may be a bit
   murky. Have you thought about recording the timestamp, and if so what are
   the pros and cons?
   - The "Recording active topics" section says the following:
  "As soon as a worker detects the addition of a topic to a connector's
  set of active topics, all the connector's tasks that inspect
source or sink
  records will cease to post update messages to the status.storage.topic."
  This probably means the timestamp won't be very useful.
   2. The KIP says "the Kafka record value stores the ID of the task that
   succeeded to store a topic status record last." However, this is a bit
   unclear: is it really storing the last task that successfully wrote to that
   topic (as this would require very frequent writes to this topic), or is it
   more that this is the task that was last *recorded* as having written to
   the topic? (Here, "recorded" could be a bit of a gray area, since this
   would depend on the how the worker periodically records this information.)
   Any kind of clarity here might be helpful.
   3. In the "Recording active topics" section (and the surrounding
   sections), the "task" is used ambiguously. For example, "when its tasks
   start processing their first records ... these tasks will start inspecting
   which is the Kafka topic of each of these records". IIUC, the first "task"
   mentioned is the connector's task, and the second is the worker's task. Do
   we need to distinguish this more clearly?
   4. Maybe I missed it, but does this KIP explicitly say that the
   Connector API is unchanged? It's probably worth pointing out to help
   assuage any concerns that connector implementations have to change to make
   use of this feature.
   5. In the "Resetting a connector's set of active topics" section the
   behavior is not exactly clear. Consider a user running connector "A", the
   connector has been fully started and is processing records, and the worker
   has recorded topic usage records. Then the user resets the active topics
   for connector A while the connector is still running? If the connector
   writes to no new topics, before the tasks are rebalanced then is it correct
   that Connect would report no active topics? And after the tasks are
   rebalance, will the worker record any topics used by connector A?
   6. In the "Restaring" (misspelled) section: "Reconfiguring a source
   connector has also no altering effect for a source connector. However, when
   reconfiguring a sink connector if the new configuration no longer includes
   any of the previously tracked topics, these topics will be removed from the
   set of active topics for this sink connector by appending tombstone
   messages appropriately after the reconfiguration of the connector." Would
   it be better to not automatically reset connector's active topics when a
   sink connector is restarted? Isn't that more consistent with the
   "Resetting" behavior and the goals at the top of the KIP: "it'd be useful
   for users, operators and applications to know which are the topics that a
   connector has used since it was first created"?
   7. The `PUT /connectors/{name}/topics/reset` endpoint "this request can
   be reapplied after the deletion of the connector". IOW, even though
   connector with that name doesn't exist, we can still make this request? How
   does this compare with other methods such as "status"?
   8. What are the security implications of this proposal?

As you can see, most of these can probably be addressed without much work.

Best regards,

Randall

On Mon, Jan 13, 2020 at 11:05 PM Konstantine Karantasis <
konstant...@confluent.io> wrote:

> Hi all.
>
> I just posted KIP-558: Track the set of actively used topics by connectors
> in Kafka Connect
>
> Wiki link here:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect
>
> I think it's a nice extension to follow up on KIP-158 and a useful feature
> to the ever increasing number of applications that are built around Kafka
> Connect.
> Would love to hear what you think.
>
> Best,
> Konstantine
>