Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Randall, I updated the KIP to add your suggestion above and better explain the few points that were outstanding since last week. Almog, I agree that 'timestamp' was too generic. I'm ok with 'discoverTimestamp'. I updated the KIP with your naming suggestion. Best, Konstantine On Tue, Jan 21, 2020 at 10:43 AM Almog Gavra wrote: > Thanks again Konstantine - really excited about this KIP! > > I'm about ready to +1 (non-binding) it with just one comment left: What do > you think about changing the timestamp field to "discoverTimestamp" or > something like that to indicate that it is the timestamp of the _first_ > time we recorded/discovered this topic. This is important if we later want > to add a "mostRecentTimestamp" field in the future to distinguish between > the two. > > Almog > > On Tue, Jan 21, 2020 at 8:01 AM Randall Hauch wrote: > > > Thanks, Konstantine. > > > > One minor request to clarify the following sentence: > > > > > > As soon as a worker detects the addition of a topic to a connector's set > of > > active topics, the worker will cease to post update messages to the > > status.storage.topic for that connector. > > > > > > As it stands, it sounds like the worker will not write *any more active > > topic records for this or any connectors* to the topic specified by the > > `status.storage.topic` worker configuration once the worker detects (by > > reading) a new active topic. I suspect that this is not the intention, > and > > that instead it is trying to say that no more messages *for this topic > and > > connector*. IOW, something more like: > > > > > > As soon as a worker detects the addition of a topic to a connector's set > of > > active topics, the worker will not post to the status.storage.topic > > additional update records for the connector and this newly-detected > active > > topic. > > > > > > Otherwise, this KIP looks great! > > > > Best regards, > > > > Randall > > > > On Fri, Jan 17, 2020 at 8:04 PM Konstantine Karantasis < > > konstant...@confluent.io> wrote: > > > > > Hi all, > > > > > > I've updated KIP-558 with the following based on our previous > discussion: > > > > > > * Added timestamp to the metadata (the record value). > > > * The KIP now mentions a metric-based implementation in the Rejected > > > Alternatives section. > > > * The record key format is now using the single character ':' as a > > > separator between topic-${topic name} and connector-${connector name} > > > * Added a bullet point to mention that the topic storing the new > > > information can be a partitioned topic. > > > * The KIP mentions that the feature does not require rebuilding > > connectors > > > (no changes in public interfaces/classes). > > > * Added a security section. > > > * KIP preserves symmetry with respect to reset between both types of > > > connectors and keeps reset and config as separate, unrelated endpoints. > > > > > > Given than we made significant progress these past few days and only a > > few > > > minor improvements in the KIPs text are remaining, I'd like to start > the > > > vote today, so that we give this KIP the necessary time (72 hours) to > > have > > > a chance to be voted by the KIP deadline next Wednesday, Jan 22nd. > > > Let's return here, or the main vote thread for any comments (either > minor > > > to major). > > > > > > Best, > > > Konstantine > > > > > > > > > > > > On Fri, Jan 17, 2020 at 3:51 PM Konstantine Karantasis < > > > konstant...@confluent.io> wrote: > > > > > > > > > > > Thanks for the follow up Chris. Replies below: > > > > > > > > On Fri, Jan 17, 2020 at 3:07 PM Christopher Egerton < > > chr...@confluent.io > > > > > > > > wrote: > > > > > > > >> Thanks, Konstantine. Just a few more questions: > > > >> > > > >> > > 2. What is the motivation for the `topic.tracking.allow.reset` > > > config? > > > >> Is > > > >> > > there any anticipated case where it would be useful to have > topic > > > >> tracking > > > >> > > enabled but with resets disabled? We could easily add this > > > >> configuration > > > >> > > later if a use case arises, but if we add it now it'll be > > difficult > > > to > > > >> > > remove. > > > >> > > > > > >> > > > >> > The motivation is for operators of a Connect cluster to be able to > > > >> disable > > > >> > resetting the history of active topics altogether, while allowing > at > > > the > > > >> > same time to view the active sets. > > > >> > > > >> What I was trying to ask was, is there a use case for enabling the > > > latter > > > >> but not the former? We should be careful about adding extra worker > > > configs > > > >> and unless we can anticipate a reasonable scenario in which this > would > > > >> happen, we should err on the side of caution and avoid adding a > config > > > >> that > > > >> would be difficult to remove later but, comparably, much easier to > > add. > > > >> > > > > > > > > The application use case is the ability to have immutable histories > of > > > > topic usage or control when resets are
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Thanks again Konstantine - really excited about this KIP! I'm about ready to +1 (non-binding) it with just one comment left: What do you think about changing the timestamp field to "discoverTimestamp" or something like that to indicate that it is the timestamp of the _first_ time we recorded/discovered this topic. This is important if we later want to add a "mostRecentTimestamp" field in the future to distinguish between the two. Almog On Tue, Jan 21, 2020 at 8:01 AM Randall Hauch wrote: > Thanks, Konstantine. > > One minor request to clarify the following sentence: > > > As soon as a worker detects the addition of a topic to a connector's set of > active topics, the worker will cease to post update messages to the > status.storage.topic for that connector. > > > As it stands, it sounds like the worker will not write *any more active > topic records for this or any connectors* to the topic specified by the > `status.storage.topic` worker configuration once the worker detects (by > reading) a new active topic. I suspect that this is not the intention, and > that instead it is trying to say that no more messages *for this topic and > connector*. IOW, something more like: > > > As soon as a worker detects the addition of a topic to a connector's set of > active topics, the worker will not post to the status.storage.topic > additional update records for the connector and this newly-detected active > topic. > > > Otherwise, this KIP looks great! > > Best regards, > > Randall > > On Fri, Jan 17, 2020 at 8:04 PM Konstantine Karantasis < > konstant...@confluent.io> wrote: > > > Hi all, > > > > I've updated KIP-558 with the following based on our previous discussion: > > > > * Added timestamp to the metadata (the record value). > > * The KIP now mentions a metric-based implementation in the Rejected > > Alternatives section. > > * The record key format is now using the single character ':' as a > > separator between topic-${topic name} and connector-${connector name} > > * Added a bullet point to mention that the topic storing the new > > information can be a partitioned topic. > > * The KIP mentions that the feature does not require rebuilding > connectors > > (no changes in public interfaces/classes). > > * Added a security section. > > * KIP preserves symmetry with respect to reset between both types of > > connectors and keeps reset and config as separate, unrelated endpoints. > > > > Given than we made significant progress these past few days and only a > few > > minor improvements in the KIPs text are remaining, I'd like to start the > > vote today, so that we give this KIP the necessary time (72 hours) to > have > > a chance to be voted by the KIP deadline next Wednesday, Jan 22nd. > > Let's return here, or the main vote thread for any comments (either minor > > to major). > > > > Best, > > Konstantine > > > > > > > > On Fri, Jan 17, 2020 at 3:51 PM Konstantine Karantasis < > > konstant...@confluent.io> wrote: > > > > > > > > Thanks for the follow up Chris. Replies below: > > > > > > On Fri, Jan 17, 2020 at 3:07 PM Christopher Egerton < > chr...@confluent.io > > > > > > wrote: > > > > > >> Thanks, Konstantine. Just a few more questions: > > >> > > >> > > 2. What is the motivation for the `topic.tracking.allow.reset` > > config? > > >> Is > > >> > > there any anticipated case where it would be useful to have topic > > >> tracking > > >> > > enabled but with resets disabled? We could easily add this > > >> configuration > > >> > > later if a use case arises, but if we add it now it'll be > difficult > > to > > >> > > remove. > > >> > > > > >> > > >> > The motivation is for operators of a Connect cluster to be able to > > >> disable > > >> > resetting the history of active topics altogether, while allowing at > > the > > >> > same time to view the active sets. > > >> > > >> What I was trying to ask was, is there a use case for enabling the > > latter > > >> but not the former? We should be careful about adding extra worker > > configs > > >> and unless we can anticipate a reasonable scenario in which this would > > >> happen, we should err on the side of caution and avoid adding a config > > >> that > > >> would be difficult to remove later but, comparably, much easier to > add. > > >> > > > > > > The application use case is the ability to have immutable histories of > > > topic usage or control when resets are allowed and how they are > performed > > > (e.g. resets could be allowed briefly during a maintenance phase and > get > > > disabled again). > > > I'm also never thrilled when I add an extra configuration parameter. > > > However namespacing here will help with the extra cognitive burden. > > > Similarly the defaults should cover most use cases too. > > > > > > > > 5. As far as automatic resets for sink connectors go, I agree with > > your > > >> > > reasoning about the inherent asymmetry between sinks and sources, > > and > > >> with > > >> > > the motivation to avoid confusing users by listing >
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Thanks, Konstantine. One minor request to clarify the following sentence: As soon as a worker detects the addition of a topic to a connector's set of active topics, the worker will cease to post update messages to the status.storage.topic for that connector. As it stands, it sounds like the worker will not write *any more active topic records for this or any connectors* to the topic specified by the `status.storage.topic` worker configuration once the worker detects (by reading) a new active topic. I suspect that this is not the intention, and that instead it is trying to say that no more messages *for this topic and connector*. IOW, something more like: As soon as a worker detects the addition of a topic to a connector's set of active topics, the worker will not post to the status.storage.topic additional update records for the connector and this newly-detected active topic. Otherwise, this KIP looks great! Best regards, Randall On Fri, Jan 17, 2020 at 8:04 PM Konstantine Karantasis < konstant...@confluent.io> wrote: > Hi all, > > I've updated KIP-558 with the following based on our previous discussion: > > * Added timestamp to the metadata (the record value). > * The KIP now mentions a metric-based implementation in the Rejected > Alternatives section. > * The record key format is now using the single character ':' as a > separator between topic-${topic name} and connector-${connector name} > * Added a bullet point to mention that the topic storing the new > information can be a partitioned topic. > * The KIP mentions that the feature does not require rebuilding connectors > (no changes in public interfaces/classes). > * Added a security section. > * KIP preserves symmetry with respect to reset between both types of > connectors and keeps reset and config as separate, unrelated endpoints. > > Given than we made significant progress these past few days and only a few > minor improvements in the KIPs text are remaining, I'd like to start the > vote today, so that we give this KIP the necessary time (72 hours) to have > a chance to be voted by the KIP deadline next Wednesday, Jan 22nd. > Let's return here, or the main vote thread for any comments (either minor > to major). > > Best, > Konstantine > > > > On Fri, Jan 17, 2020 at 3:51 PM Konstantine Karantasis < > konstant...@confluent.io> wrote: > > > > > Thanks for the follow up Chris. Replies below: > > > > On Fri, Jan 17, 2020 at 3:07 PM Christopher Egerton > > > wrote: > > > >> Thanks, Konstantine. Just a few more questions: > >> > >> > > 2. What is the motivation for the `topic.tracking.allow.reset` > config? > >> Is > >> > > there any anticipated case where it would be useful to have topic > >> tracking > >> > > enabled but with resets disabled? We could easily add this > >> configuration > >> > > later if a use case arises, but if we add it now it'll be difficult > to > >> > > remove. > >> > > > >> > >> > The motivation is for operators of a Connect cluster to be able to > >> disable > >> > resetting the history of active topics altogether, while allowing at > the > >> > same time to view the active sets. > >> > >> What I was trying to ask was, is there a use case for enabling the > latter > >> but not the former? We should be careful about adding extra worker > configs > >> and unless we can anticipate a reasonable scenario in which this would > >> happen, we should err on the side of caution and avoid adding a config > >> that > >> would be difficult to remove later but, comparably, much easier to add. > >> > > > > The application use case is the ability to have immutable histories of > > topic usage or control when resets are allowed and how they are performed > > (e.g. resets could be allowed briefly during a maintenance phase and get > > disabled again). > > I'm also never thrilled when I add an extra configuration parameter. > > However namespacing here will help with the extra cognitive burden. > > Similarly the defaults should cover most use cases too. > > > > > > 5. As far as automatic resets for sink connectors go, I agree with > your > >> > > reasoning about the inherent asymmetry between sinks and sources, > and > >> with > >> > > the motivation to avoid confusing users by listing > no-longer-consumed > >> > > topics in the active topics for a sink connector. I think that this > >> > > asymmetry is worth avoiding a scenario where a connector is > >> reconfigured to > >> > > only consume from topic "foo" but, from a prior configuration, topic > >> "bar" > >> > > is still listed in its active topics. > >> > > I do want to request clarification on the meaning of the phrase "any > >> topics > >> > > no longer consumed" as used under the header "Restarting, > >> reconfiguring > >> or > >> > > deleting a connector". Does this mean that the current set of active > >> topics > >> > > for the connector will be filtered and any that are longer contained > >> in > >> the > >> > > sink connector's "topics" config or matched by its
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Hi all, I've updated KIP-558 with the following based on our previous discussion: * Added timestamp to the metadata (the record value). * The KIP now mentions a metric-based implementation in the Rejected Alternatives section. * The record key format is now using the single character ':' as a separator between topic-${topic name} and connector-${connector name} * Added a bullet point to mention that the topic storing the new information can be a partitioned topic. * The KIP mentions that the feature does not require rebuilding connectors (no changes in public interfaces/classes). * Added a security section. * KIP preserves symmetry with respect to reset between both types of connectors and keeps reset and config as separate, unrelated endpoints. Given than we made significant progress these past few days and only a few minor improvements in the KIPs text are remaining, I'd like to start the vote today, so that we give this KIP the necessary time (72 hours) to have a chance to be voted by the KIP deadline next Wednesday, Jan 22nd. Let's return here, or the main vote thread for any comments (either minor to major). Best, Konstantine On Fri, Jan 17, 2020 at 3:51 PM Konstantine Karantasis < konstant...@confluent.io> wrote: > > Thanks for the follow up Chris. Replies below: > > On Fri, Jan 17, 2020 at 3:07 PM Christopher Egerton > wrote: > >> Thanks, Konstantine. Just a few more questions: >> >> > > 2. What is the motivation for the `topic.tracking.allow.reset` config? >> Is >> > > there any anticipated case where it would be useful to have topic >> tracking >> > > enabled but with resets disabled? We could easily add this >> configuration >> > > later if a use case arises, but if we add it now it'll be difficult to >> > > remove. >> > > >> >> > The motivation is for operators of a Connect cluster to be able to >> disable >> > resetting the history of active topics altogether, while allowing at the >> > same time to view the active sets. >> >> What I was trying to ask was, is there a use case for enabling the latter >> but not the former? We should be careful about adding extra worker configs >> and unless we can anticipate a reasonable scenario in which this would >> happen, we should err on the side of caution and avoid adding a config >> that >> would be difficult to remove later but, comparably, much easier to add. >> > > The application use case is the ability to have immutable histories of > topic usage or control when resets are allowed and how they are performed > (e.g. resets could be allowed briefly during a maintenance phase and get > disabled again). > I'm also never thrilled when I add an extra configuration parameter. > However namespacing here will help with the extra cognitive burden. > Similarly the defaults should cover most use cases too. > > > > 5. As far as automatic resets for sink connectors go, I agree with your >> > > reasoning about the inherent asymmetry between sinks and sources, and >> with >> > > the motivation to avoid confusing users by listing no-longer-consumed >> > > topics in the active topics for a sink connector. I think that this >> > > asymmetry is worth avoiding a scenario where a connector is >> reconfigured to >> > > only consume from topic "foo" but, from a prior configuration, topic >> "bar" >> > > is still listed in its active topics. >> > > I do want to request clarification on the meaning of the phrase "any >> topics >> > > no longer consumed" as used under the header "Restarting, >> reconfiguring >> or >> > > deleting a connector". Does this mean that the current set of active >> topics >> > > for the connector will be filtered and any that are longer contained >> in >> the >> > > sink connector's "topics" config or matched by its "topics.regex" >> config >> > > will be removed, or does it mean that all topics will be removed and >> then >> > > the active topics list will be repopulated as records are consumed >> from >> new >> > > topics? >> > > >> >> > The intention was to imply the former. But based on Randall's comment >> > above, I'm changing the KIP to include a reset parameter in the PUT >> > /connectors/{name}/config endpoint >> > In this case, the reset will be a complete reset for both source and >> sink >> > connectors. This will help keeping the behavior symmetric between the >> two >> > connector types. >> >> I did see Randall's suggestion, but I was hoping we could retain some more >> intelligent behavior. Two things I'd like for us to avoid if possible: >> >> - Sinks consuming from infrequently-written topics unnecessarily dropping >> those topics, either as part of an explicit reset or an implicit one >> - Sinks listing a topic in their active topics list that they are, in >> reality, no longer consuming from >> >> Intelligently filtering out no-longer-consumed topics from a sink >> connector's active topics list (instead of blanket resetting, or not >> resetting at all) would prevent both of those from happening. We could >> expand the
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Thanks for the follow up Chris. Replies below: On Fri, Jan 17, 2020 at 3:07 PM Christopher Egerton wrote: > Thanks, Konstantine. Just a few more questions: > > > > 2. What is the motivation for the `topic.tracking.allow.reset` config? > Is > > > there any anticipated case where it would be useful to have topic > tracking > > > enabled but with resets disabled? We could easily add this > configuration > > > later if a use case arises, but if we add it now it'll be difficult to > > > remove. > > > > > > The motivation is for operators of a Connect cluster to be able to > disable > > resetting the history of active topics altogether, while allowing at the > > same time to view the active sets. > > What I was trying to ask was, is there a use case for enabling the latter > but not the former? We should be careful about adding extra worker configs > and unless we can anticipate a reasonable scenario in which this would > happen, we should err on the side of caution and avoid adding a config that > would be difficult to remove later but, comparably, much easier to add. > The application use case is the ability to have immutable histories of topic usage or control when resets are allowed and how they are performed (e.g. resets could be allowed briefly during a maintenance phase and get disabled again). I'm also never thrilled when I add an extra configuration parameter. However namespacing here will help with the extra cognitive burden. Similarly the defaults should cover most use cases too. > > 5. As far as automatic resets for sink connectors go, I agree with your > > > reasoning about the inherent asymmetry between sinks and sources, and > with > > > the motivation to avoid confusing users by listing no-longer-consumed > > > topics in the active topics for a sink connector. I think that this > > > asymmetry is worth avoiding a scenario where a connector is > reconfigured to > > > only consume from topic "foo" but, from a prior configuration, topic > "bar" > > > is still listed in its active topics. > > > I do want to request clarification on the meaning of the phrase "any > topics > > > no longer consumed" as used under the header "Restarting, reconfiguring > or > > > deleting a connector". Does this mean that the current set of active > topics > > > for the connector will be filtered and any that are longer contained in > the > > > sink connector's "topics" config or matched by its "topics.regex" > config > > > will be removed, or does it mean that all topics will be removed and > then > > > the active topics list will be repopulated as records are consumed from > new > > > topics? > > > > > > The intention was to imply the former. But based on Randall's comment > > above, I'm changing the KIP to include a reset parameter in the PUT > > /connectors/{name}/config endpoint > > In this case, the reset will be a complete reset for both source and sink > > connectors. This will help keeping the behavior symmetric between the two > > connector types. > > I did see Randall's suggestion, but I was hoping we could retain some more > intelligent behavior. Two things I'd like for us to avoid if possible: > > - Sinks consuming from infrequently-written topics unnecessarily dropping > those topics, either as part of an explicit reset or an implicit one > - Sinks listing a topic in their active topics list that they are, in > reality, no longer consuming from > > Intelligently filtering out no-longer-consumed topics from a sink > connector's active topics list (instead of blanket resetting, or not > resetting at all) would prevent both of those from happening. We could > expand the proposed reset parameter from a boolean to a three-option > parameter with "none" (don't reset the topics list), "all" (reset the > entire topics list), and "infer" (reset all topics if for a source, or > intelligently filter out no-longer-consumed topics if for a sink). > I don't see significant advantages in the complexity that the three-value query parameter would introduce. Overall resetting is included for convenience and is not essential to the main objective of this KIP which is to track the topics used by a connector during its lifetime. I think it's desirable to strike a good balance between the objective of tracking the topics used by connectors and keeping things simple. Given that the general programming model that Kafka Connect supports is that of continuous streams of events, representing active topics as either the topics that a connector has used since it was first created or as the topics that have been actively used since the latest reset is sufficient to cover a large majority of use cases. Furthermore, I see future KIPs that would add features to topic tracking preferable in comparison to future KIPs that would try to remedy this first KIP with adjustments, simplifications and deprecation of features. Therefore, the query parameter, which I'll add to the KIP shortly, can indeed be represented as a boolean. Absence
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Thanks, Konstantine. Just a few more questions: > > 2. What is the motivation for the `topic.tracking.allow.reset` config? Is > > there any anticipated case where it would be useful to have topic tracking > > enabled but with resets disabled? We could easily add this configuration > > later if a use case arises, but if we add it now it'll be difficult to > > remove. > > > The motivation is for operators of a Connect cluster to be able to disable > resetting the history of active topics altogether, while allowing at the > same time to view the active sets. What I was trying to ask was, is there a use case for enabling the latter but not the former? We should be careful about adding extra worker configs and unless we can anticipate a reasonable scenario in which this would happen, we should err on the side of caution and avoid adding a config that would be difficult to remove later but, comparably, much easier to add. > > 5. As far as automatic resets for sink connectors go, I agree with your > > reasoning about the inherent asymmetry between sinks and sources, and with > > the motivation to avoid confusing users by listing no-longer-consumed > > topics in the active topics for a sink connector. I think that this > > asymmetry is worth avoiding a scenario where a connector is reconfigured to > > only consume from topic "foo" but, from a prior configuration, topic "bar" > > is still listed in its active topics. > > I do want to request clarification on the meaning of the phrase "any topics > > no longer consumed" as used under the header "Restarting, reconfiguring or > > deleting a connector". Does this mean that the current set of active topics > > for the connector will be filtered and any that are longer contained in the > > sink connector's "topics" config or matched by its "topics.regex" config > > will be removed, or does it mean that all topics will be removed and then > > the active topics list will be repopulated as records are consumed from new > > topics? > > > The intention was to imply the former. But based on Randall's comment > above, I'm changing the KIP to include a reset parameter in the PUT > /connectors/{name}/config endpoint > In this case, the reset will be a complete reset for both source and sink > connectors. This will help keeping the behavior symmetric between the two > connector types. I did see Randall's suggestion, but I was hoping we could retain some more intelligent behavior. Two things I'd like for us to avoid if possible: - Sinks consuming from infrequently-written topics unnecessarily dropping those topics, either as part of an explicit reset or an implicit one - Sinks listing a topic in their active topics list that they are, in reality, no longer consuming from Intelligently filtering out no-longer-consumed topics from a sink connector's active topics list (instead of blanket resetting, or not resetting at all) would prevent both of those from happening. We could expand the proposed reset parameter from a boolean to a three-option parameter with "none" (don't reset the topics list), "all" (reset the entire topics list), and "infer" (reset all topics if for a source, or intelligently filter out no-longer-consumed topics if for a sink). Independent of your thoughts on the above, what will the default value for the newly-proposed parameter be? If resets are performed by default, the first scenario I outlined would become possible; if not, then the second would become possible. I'd lean towards performing them by default but would be interested in others' thoughts. (If the proposed "infer" value were the default, neither scenario would be an issue). Cheers, Chris On Fri, Jan 17, 2020 at 2:01 PM Konstantine Karantasis < konstant...@confluent.io> wrote: > Hey Chris! Thanks for the comments. Answers inline below: > > On Fri, Jan 17, 2020 at 11:47 AM Christopher Egerton > wrote: > > > Hi Konstantine, > > > > Thanks for the KIP! There's been a lot of productive discussion so far so > > I'll try to keep my remarks brief. > > > > 1. As far as resetting the active topics for a connector goes, it's noted > > in the KIP that this can be done for a deleted connector. Can this also > be > > done for connectors that were never created to begin with? What would the > > behavior be in this case? (Can this be clarified in the KIP?) > > > > Indeed, the intention is to keep reset as an independent and idempotent > method. > Keep in mind that a tombstone will be written to the topic if the in-memory > view (of active topics) of the worker that serves the request contains this > connector. > This should at least prevent fake reset requests from filling up the topic > with tombstone messages. > > > > > 2. What is the motivation for the `topic.tracking.allow.reset` config? Is > > there any anticipated case where it would be useful to have topic > tracking > > enabled but with resets disabled? We could easily add this configuration > > later if a use case arises, but if we add it
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Hey Chris! Thanks for the comments. Answers inline below: On Fri, Jan 17, 2020 at 11:47 AM Christopher Egerton wrote: > Hi Konstantine, > > Thanks for the KIP! There's been a lot of productive discussion so far so > I'll try to keep my remarks brief. > > 1. As far as resetting the active topics for a connector goes, it's noted > in the KIP that this can be done for a deleted connector. Can this also be > done for connectors that were never created to begin with? What would the > behavior be in this case? (Can this be clarified in the KIP?) > Indeed, the intention is to keep reset as an independent and idempotent method. Keep in mind that a tombstone will be written to the topic if the in-memory view (of active topics) of the worker that serves the request contains this connector. This should at least prevent fake reset requests from filling up the topic with tombstone messages. > 2. What is the motivation for the `topic.tracking.allow.reset` config? Is > there any anticipated case where it would be useful to have topic tracking > enabled but with resets disabled? We could easily add this configuration > later if a use case arises, but if we add it now it'll be difficult to > remove. > The motivation is for operators of a Connect cluster to be able to disable resetting the history of active topics altogether, while allowing at the same time to view the active sets. > 3. Nit - the JSON formatting in the value format/value example columns > under the "Format of the new status record" heading is a little confusing. > Assuming the top-level value is meant to be an object, it should be wrapped > in braces ("{" and "}"). > Good catch. Fixed. > 4. The KIP focuses heavily on the use of the status topic for storage of > connector topic information, but presumably we'd also want this information > to be available in standalone mode. If this is the case, it'd be nice to > tweak the language to refer explicitly to distributed mode when discussing > the changes to the status topic and note (probably just in once place) that > similar functionality will also be added to the standalone worker's > in-memory status store. > It's true that the design is detailed w.r.t. what should happen in the KafkaStatusBackingStore which is a Kafka-based implementation of the StatusBackingStore interface. This is intentional because this implementation influences and informs the semantics of topic tracking. I'd prefer not to make the language too abstract here. A KIP is not exactly a standard and KIPs often discuss the impact of implementation in behavior (this KIP is a good example). But I'm happy to add a note to mention that these semantics will apply to standalone mode too. > 5. As far as automatic resets for sink connectors go, I agree with your > reasoning about the inherent asymmetry between sinks and sources, and with > the motivation to avoid confusing users by listing no-longer-consumed > topics in the active topics for a sink connector. I think that this > asymmetry is worth avoiding a scenario where a connector is reconfigured to > only consume from topic "foo" but, from a prior configuration, topic "bar" > is still listed in its active topics. > I do want to request clarification on the meaning of the phrase "any topics > no longer consumed" as used under the header "Restarting, reconfiguring or > deleting a connector". Does this mean that the current set of active topics > for the connector will be filtered and any that are longer contained in the > sink connector's "topics" config or matched by its "topics.regex" config > will be removed, or does it mean that all topics will be removed and then > the active topics list will be repopulated as records are consumed from new > topics? > The intention was to imply the former. But based on Randall's comment above, I'm changing the KIP to include a reset parameter in the PUT /connectors/{name}/config endpoint In this case, the reset will be a complete reset for both source and sink connectors. This will help keeping the behavior symmetric between the two connector types. Best, Konstantine > Cheers, > > Chris > > On Thu, Jan 16, 2020 at 1:51 PM Konstantine Karantasis < > konstant...@confluent.io> wrote: > > > Thanks for new comments Randall. Following up with my replies inline > below. > > I'll also go ahead and update the KIP with the suggestions that are > > outstanding right now and post a summary of the changes. > > > > On Wed, Jan 15, 2020 at 2:37 PM Randall Hauch wrote: > > > > > My responses are inline: > > > > > > On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis < > > > konstant...@confluent.io> wrote: > > > > > > > Hi Randall, Tom and Almog. I'm excited to read your comments. I'll > > reply > > > in > > > > separate emails, in order. > > > > > > > > First, to Randall's comments, I'm replying below with a reference to > > the > > > > comment number: > > > > > > > > 1. Although I can imagine we'd be interested in adding additional > > > metadata >
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Hi Konstantine, Thanks for the KIP! There's been a lot of productive discussion so far so I'll try to keep my remarks brief. 1. As far as resetting the active topics for a connector goes, it's noted in the KIP that this can be done for a deleted connector. Can this also be done for connectors that were never created to begin with? What would the behavior be in this case? (Can this be clarified in the KIP?) 2. What is the motivation for the `topic.tracking.allow.reset` config? Is there any anticipated case where it would be useful to have topic tracking enabled but with resets disabled? We could easily add this configuration later if a use case arises, but if we add it now it'll be difficult to remove. 3. Nit - the JSON formatting in the value format/value example columns under the "Format of the new status record" heading is a little confusing. Assuming the top-level value is meant to be an object, it should be wrapped in braces ("{" and "}"). 4. The KIP focuses heavily on the use of the status topic for storage of connector topic information, but presumably we'd also want this information to be available in standalone mode. If this is the case, it'd be nice to tweak the language to refer explicitly to distributed mode when discussing the changes to the status topic and note (probably just in once place) that similar functionality will also be added to the standalone worker's in-memory status store. 5. As far as automatic resets for sink connectors go, I agree with your reasoning about the inherent asymmetry between sinks and sources, and with the motivation to avoid confusing users by listing no-longer-consumed topics in the active topics for a sink connector. I think that this asymmetry is worth avoiding a scenario where a connector is reconfigured to only consume from topic "foo" but, from a prior configuration, topic "bar" is still listed in its active topics. I do want to request clarification on the meaning of the phrase "any topics no longer consumed" as used under the header "Restarting, reconfiguring or deleting a connector". Does this mean that the current set of active topics for the connector will be filtered and any that are longer contained in the sink connector's "topics" config or matched by its "topics.regex" config will be removed, or does it mean that all topics will be removed and then the active topics list will be repopulated as records are consumed from new topics? Cheers, Chris On Thu, Jan 16, 2020 at 1:51 PM Konstantine Karantasis < konstant...@confluent.io> wrote: > Thanks for new comments Randall. Following up with my replies inline below. > I'll also go ahead and update the KIP with the suggestions that are > outstanding right now and post a summary of the changes. > > On Wed, Jan 15, 2020 at 2:37 PM Randall Hauch wrote: > > > My responses are inline: > > > > On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis < > > konstant...@confluent.io> wrote: > > > > > Hi Randall, Tom and Almog. I'm excited to read your comments. I'll > reply > > in > > > separate emails, in order. > > > > > > First, to Randall's comments, I'm replying below with a reference to > the > > > comment number: > > > > > > 1. Although I can imagine we'd be interested in adding additional > > metadata > > > in the record value, I didn't see the need for a timestamp in this > first > > > draft. > > > Now that you mention, the way I'd interpret a timestamp in the > connector > > > status record value would be as an approximation of since when this > > > connector has been using this topic. > > > Happy to add this if we think this info is useful. Of course, accuracy > of > > > this information depends on message retention in Kafka and on how long > > the > > > workers have been running without a restart, so this might make this > > > approximation less useful if it gets recomputed from time to time. > > > To your reference in "Recording active topics" I'll reply below, > because > > > that's Tom's question too. > > > > > > > Makes sense that the timestamp in the connector is the (approximate) time > > that the connector has been using the topic. I do think it's worth adding > > in the record value (not relying upon Kafka record timestamp). > > > > Regarding "message retention", by default Connect creates the status > topic > > with compaction but no deletion policy, which means infinite retention. > > Don't several things become problematic if finite retention is used on > the > > status topic, or do we need to worry about this for the active topic > > records. Do we need to periodically rewrite all of the active topic > > records? If so, we could just write new records using the original > > timestamp as originally read by the worker. If the worker does > periodically > > (maybe just on task startup) rewrite the active topic records, then we'd > > have to be sure about the semantics of and interplay with concurrent > > explicit "reset" calls. > > > > Good point. These topics are configured to have
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Thanks for new comments Randall. Following up with my replies inline below. I'll also go ahead and update the KIP with the suggestions that are outstanding right now and post a summary of the changes. On Wed, Jan 15, 2020 at 2:37 PM Randall Hauch wrote: > My responses are inline: > > On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis < > konstant...@confluent.io> wrote: > > > Hi Randall, Tom and Almog. I'm excited to read your comments. I'll reply > in > > separate emails, in order. > > > > First, to Randall's comments, I'm replying below with a reference to the > > comment number: > > > > 1. Although I can imagine we'd be interested in adding additional > metadata > > in the record value, I didn't see the need for a timestamp in this first > > draft. > > Now that you mention, the way I'd interpret a timestamp in the connector > > status record value would be as an approximation of since when this > > connector has been using this topic. > > Happy to add this if we think this info is useful. Of course, accuracy of > > this information depends on message retention in Kafka and on how long > the > > workers have been running without a restart, so this might make this > > approximation less useful if it gets recomputed from time to time. > > To your reference in "Recording active topics" I'll reply below, because > > that's Tom's question too. > > > > Makes sense that the timestamp in the connector is the (approximate) time > that the connector has been using the topic. I do think it's worth adding > in the record value (not relying upon Kafka record timestamp). > > Regarding "message retention", by default Connect creates the status topic > with compaction but no deletion policy, which means infinite retention. > Don't several things become problematic if finite retention is used on the > status topic, or do we need to worry about this for the active topic > records. Do we need to periodically rewrite all of the active topic > records? If so, we could just write new records using the original > timestamp as originally read by the worker. If the worker does periodically > (maybe just on task startup) rewrite the active topic records, then we'd > have to be sure about the semantics of and interplay with concurrent > explicit "reset" calls. > Good point. These topics are configured to have infinite retention. I'll add the timestamp as type 'long'. > > > > > 2. I'll explain with an example, that maybe is worth adding to the KIP > > because what's expected to happen might not be as obvious as I thought > when > > a new topic is recorded. > > Let's say we have two workers, W1 and W2, each running two worker tasks > T11 > > T12 and T21 T22 respectively associated with a connector C1. All tasks > will > > run producers that will produce records to the same topic, "test-topic". > > When the connector starts, both workers track this connector's set of > > active topics as empty. Given the absence of synchronization (that's > good) > > in how this information is recorded and persisted in the status topic, > all > > four tasks might race to record status messages: > > > > For example: > > > > T11, running at worker W1, will send Kafka records with: > > key: topic-test-topic-connector-C1 > > value: "topic": { "connector": "some-source", "task": > "some-source-TT11", > > "name": "test-topic" } > > > > and T22, running at worker W2, will send Kafka records with: > > key: topic-test-topic-connector-C1 > > value: "topic": { "connector": "some-source", "task": > "some-source-TT22", > > "name": "test-topic" } > > > > (similarly tasks T12 and T21 might send topic status records). > > > > These four records (they might not even be four but there's going to be > at > > least one) may be written in any order. Because the topic is compacted > and > > these records have the same key, eventually only one message will be > > retained. > > The task ID of that message will be the ID of the task that wrote last. I > > can see this being used mostly for troubleshooting. > > > > Thanks for the clarification. Might be good to clarify the language a bit > more, though I'm not convinced an example is really needed. > I'll try to see how they both fit. Sure. > > > > > > 3. I believe across the whole KIP, when I'm referring to the task > entity, I > > imply the worker task. Not the user code that is running as > implementation > > of the SourceTask or SinkTask abstract classes. Didn't want to increase > > complexity by referring to a task as worker task. > > But I see your point and I'm going to prefer the terms "worker" and > "worker > > task" to highlight that it's the framework that is aware of this feature > > and not the user code. > > > > Thank you. > +1 > > > > > > 4. I assumed that absence of changes to the public API would indicate > that > > these interfaces/abstract classes remain unchanged. But definitely it's > > worth to explicitly mention that. > > > > Thanks! > > +1 > > > > > 5. That is correct. My
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Thanks for the follow up Tom. Replies inline below this time: On Thu, Jan 16, 2020 at 1:16 AM Tom Bentley wrote: > Hi Konstantine, > > b) I thought hard but briefly about this. Although there's potential for > > ambiguity if someone inspects the key on its own, I believe that if we > > combine this information with what we save in the value, the ambiguity is > > removed. With respect to whether a connector can override another > > connector's key, I don't think that's possible for any combination of > > unique topic name and connector name. I hope I'm not missing anything. > > > > I think it is possible for connectors to have conflicting keys: > > * topic-name="foo-connector" and connector-name="bar" > * topic-name="foo" and connector-name="connector-bar" > > would both have the key "topic-foo-connector-connector-bar" > > Indeed. Thanks for the pointer Tom. A collision is not as unlikely as I initially thought. I see roughly two solutions: a) Use a special character as a delimiter. I initially wanted to avoid this, but given that topic names have quite restrictive rules with respect to their allowed character set, the colon character ':' could be used to separate the topic from the connector name. Connector names are more flexible, but restrictions in the topic name will suffice. I suggest keeping prefix 'topic-' and 'connector-' after the delimiter. b) Use json to encode the key. In order to be more consistent with the format of existing keys in the status topic, I'm suggesting to adopt option (a) - use ':' as a separator - at the moment. If at some point we decide to start using json for the record keys of internal Connect topics, we should do it in a more concerted way and in a separate KIP. > > c) That means that worker tasks will produce topic status records once > they > > detect that the worker they are running on does not include a topic that > is > > used by the tasks in the list of active topics for this connector. But > once > > the worker adds this topic to the set of active topics for this > connector, > > then the worker tasks stop producing those messages. They'll produce one > > again if the worker stops including that topic in the active topics set > for > > some reason. I could improve the wording on the KIP, but I'd also like to > > know we are on the same page re: the design here. > > > > OK, I understand now. I think it would be helpful to include the example > you gave in your previous reply. > Definitely. Will add the example. > Many thanks, > > Tom > > I've updated the KIP to mention the delimiter and using a metric to track active topics as a rejected alternative. Following up with the rest of the changes shortly. Cheers, Konstantine > > > > Let me know if the above address your questions. > > Thanks, > > Konstantine > > > > > > > > On Wed, Jan 15, 2020 at 12:05 PM Randall Hauch wrote: > > > > > Almog, > > > > > > You raise some interesting questions. Comments inline below. > > > > > > On Wed, Jan 15, 2020 at 11:19 AM Almog Gavra > wrote: > > > > > > > Hi Konstantine, > > > > > > > > Thanks for the KIP! This is going to make automatic integration with > > > > Connect much more powerful. > > > > > > > > My thoughts are mostly around freshness of the data and being able to > > > > expose that to users. Riffing on Randall's timestamp question - have > we > > > > considered adding some interval at which point a connector will > > republish > > > > any topics that it encounters and update the timestamp? That way we > > have > > > > some refreshing mechanism that isn't as powerful as the complete > reset > > > > (which may not be practical in many scenarios). > > > > > > > > > > My question about recording the timestamp at which each active topic > > record > > > were (infrequently) written was more about making a bit more > information > > > available given the current design, and whether recording a bit more > > > information (for very little additional storage cost and no extra > runtime > > > cost) may be worth it if in the future we figure out how to use this > > > information. > > > > > > I think it's more complicated to try to record the history of when > topics > > > were most recently used, since that requires recording a lot more > active > > > topic records than the current proposal. Besides, it's not unexpected > > that > > > source and sink connectors sometimes don't use topics for periods of > > time. > > > A sink connector only consumes from a topic when there are additional > > > records to consume, and a source connector only needs to write to a > topic > > > when there is information in the upstream system targeted to that > topic. > > An > > > example of the latter is that most database connectors will write to a > > > topic for a particular table only when rows in that table have changed. > > > > > > > > > > I also agree with Randall's other point (Would it be better to not > > > > automatically reset connector's active topics when a sink connector >
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Hi Konstantine, b) I thought hard but briefly about this. Although there's potential for > ambiguity if someone inspects the key on its own, I believe that if we > combine this information with what we save in the value, the ambiguity is > removed. With respect to whether a connector can override another > connector's key, I don't think that's possible for any combination of > unique topic name and connector name. I hope I'm not missing anything. > I think it is possible for connectors to have conflicting keys: * topic-name="foo-connector" and connector-name="bar" * topic-name="foo" and connector-name="connector-bar" would both have the key "topic-foo-connector-connector-bar" > c) That means that worker tasks will produce topic status records once they > detect that the worker they are running on does not include a topic that is > used by the tasks in the list of active topics for this connector. But once > the worker adds this topic to the set of active topics for this connector, > then the worker tasks stop producing those messages. They'll produce one > again if the worker stops including that topic in the active topics set for > some reason. I could improve the wording on the KIP, but I'd also like to > know we are on the same page re: the design here. > OK, I understand now. I think it would be helpful to include the example you gave in your previous reply. Many thanks, Tom > > Let me know if the above address your questions. > Thanks, > Konstantine > > > > On Wed, Jan 15, 2020 at 12:05 PM Randall Hauch wrote: > > > Almog, > > > > You raise some interesting questions. Comments inline below. > > > > On Wed, Jan 15, 2020 at 11:19 AM Almog Gavra wrote: > > > > > Hi Konstantine, > > > > > > Thanks for the KIP! This is going to make automatic integration with > > > Connect much more powerful. > > > > > > My thoughts are mostly around freshness of the data and being able to > > > expose that to users. Riffing on Randall's timestamp question - have we > > > considered adding some interval at which point a connector will > republish > > > any topics that it encounters and update the timestamp? That way we > have > > > some refreshing mechanism that isn't as powerful as the complete reset > > > (which may not be practical in many scenarios). > > > > > > > My question about recording the timestamp at which each active topic > record > > were (infrequently) written was more about making a bit more information > > available given the current design, and whether recording a bit more > > information (for very little additional storage cost and no extra runtime > > cost) may be worth it if in the future we figure out how to use this > > information. > > > > I think it's more complicated to try to record the history of when topics > > were most recently used, since that requires recording a lot more active > > topic records than the current proposal. Besides, it's not unexpected > that > > source and sink connectors sometimes don't use topics for periods of > time. > > A sink connector only consumes from a topic when there are additional > > records to consume, and a source connector only needs to write to a topic > > when there is information in the upstream system targeted to that topic. > An > > example of the latter is that most database connectors will write to a > > topic for a particular table only when rows in that table have changed. > > > > > > > I also agree with Randall's other point (Would it be better to not > > > automatically reset connector's active topics when a sink connector is > > > restarted?). I think keeping the behavior as symmetrical between sink > and > > > source connectors is a good idea. > > > > > > Lastly, with regards to the API, I can imagine it is also pretty useful > > to > > > answer the inverse question: "which connectors write to topic X". > Perhaps > > > we can achieve this by letting the users compute it and just expose an > > API > > > that returns the entire mapping at once (instead of needing to call the > > > /connectors/{name}/topics endpoint for each connector). > > > > > > It may be worth considering a method such as /topics that would produce a > > result that is an aggregate of the potentially many > > /connectors/{name}/topic responses, especially if the result of the > /topics > > is an array of the individual responses from /connectors/{name}/topic. > The > > benefit of a single request is that the answer to "which connectors use > > topic X" can be computed using simple tools such as 'jq'. And, if tooling > > frequently fetches the active topic information for all connectors, > > providing the aggregate method would reduce the load on the tooling and > the > > server. It may also be relatively easy to implement. > > > > > > > Otherwise, looks good to me! Hits the requirements that I had in mind > on > > > the nose. > > > - Almog > > > > > > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley > wrote: > > > > > > > Hi Konstantine, > > > > > > > > Thanks for
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Hey Almog, thanks for the comments! Here's my take: 1) I think that an approximate grouping of topics to highly-active/active/inactive (because a precise one would be too expensive) seems like something we could leave out of this first version of topic tracking. Interestingly, as you point out, given a topic of continuous flow, you may get a similar view by resetting and subsequently querying the topics endpoint. The way I understand Randall's comment, is that persisting timestamps would have to do more with knowing approximately the first time this connector has been using its topics. But given the implications that topic retention might have, I suggest that we leave timestamp recording out at this point. 2) As I described in my previous answer above (#6 on my first reply to Randall's comments), this asymmetry is inherited from the sink connector's configuration. I'm ok not resetting the topics for both upon reconfiguration, since this will result in simpler code. But I'd like to know that we are ok with a sink connector showing topics in its active set that are not in its current configuration (unless a reset request is issued). 3) A similar idea crossed my mind while I was thinking how the "/connectors" endpoint evolved with KIP-465 to show a roll up of the status of the tasks of all the connectors. However, here what you describe would probably require an additional top-level "/topics" endpoint and a more complex filtering based on permissions. I'd suggest punting this feature, unless people think that is a really nice to have. In the meantime, as you mention, it is something that can be constructed with consecutive queries after an applications gets the list of connectors running in the Connect cluster. Cheers, Konstantine On Wed, Jan 15, 2020 at 2:41 PM Randall Hauch wrote: > On Wed, Jan 15, 2020 at 4:36 PM Randall Hauch wrote: > > > On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis < > > konstant...@confluent.io> wrote: > > > >> > >> 9. I assumed that partitioning is implied by default, because there's no > >> requirement for complete ordering of topic status records. But I'll add > >> this fact as a separate bullet. The status.storage.topic is already a > >> partitioned topic. > >> > > > > Agreed. I think it'd be sufficient to simply mention that partition will > > be chosen based upon the active topic records' keys, ensuring that all > > active topic records for the same connector will be written to the same > > partition and will be totally ordered. > > > > Well, my previous statement is not quite right. All topic records for the > same *connector and topic* will be written to the same partition and will > be totally ordered. But as you pointed out, it doesn't really matter, other > than that this feature will work with any # of partitions. The new bullet > you described would be sufficient. :-D > > > > > >> > >> I'm following up with the rest of the comments, shortly. > >> Thanks, > >> Konstantine > >> > >> > >> On Wed, Jan 15, 2020 at 9:19 AM Almog Gavra wrote: > >> > >> > Hi Konstantine, > >> > > >> > Thanks for the KIP! This is going to make automatic integration with > >> > Connect much more powerful. > >> > > >> > My thoughts are mostly around freshness of the data and being able to > >> > expose that to users. Riffing on Randall's timestamp question - have > we > >> > considered adding some interval at which point a connector will > >> republish > >> > any topics that it encounters and update the timestamp? That way we > have > >> > some refreshing mechanism that isn't as powerful as the complete reset > >> > (which may not be practical in many scenarios). > >> > > >> > I also agree with Randall's other point (Would it be better to not > >> > automatically reset connector's active topics when a sink connector is > >> > restarted?). I think keeping the behavior as symmetrical between sink > >> and > >> > source connectors is a good idea. > >> > > >> > Lastly, with regards to the API, I can imagine it is also pretty > useful > >> to > >> > answer the inverse question: "which connectors write to topic X". > >> Perhaps > >> > we can achieve this by letting the users compute it and just expose an > >> API > >> > that returns the entire mapping at once (instead of needing to call > the > >> > /connectors/{name}/topics endpoint for each connector). > >> > > >> > Otherwise, looks good to me! Hits the requirements that I had in mind > on > >> > the nose. > >> > - Almog > >> > > >> > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley > >> wrote: > >> > > >> > > Hi Konstantine, > >> > > > >> > > Thanks for the KIP, I can see how it could be useful. > >> > > > >> > > a) Did you consider using a metric for this? I don't think it would > >> > satisfy > >> > > all the use cases you have in mind, but you could mention it in the > >> > > rejected alternatives. > >> > > > >> > > b) If the topic name contains the string "-connector" then the key > >> format > >> > > is ambiguous. This isn't
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
On Wed, Jan 15, 2020 at 4:36 PM Randall Hauch wrote: > On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis < > konstant...@confluent.io> wrote: > >> >> 9. I assumed that partitioning is implied by default, because there's no >> requirement for complete ordering of topic status records. But I'll add >> this fact as a separate bullet. The status.storage.topic is already a >> partitioned topic. >> > > Agreed. I think it'd be sufficient to simply mention that partition will > be chosen based upon the active topic records' keys, ensuring that all > active topic records for the same connector will be written to the same > partition and will be totally ordered. > Well, my previous statement is not quite right. All topic records for the same *connector and topic* will be written to the same partition and will be totally ordered. But as you pointed out, it doesn't really matter, other than that this feature will work with any # of partitions. The new bullet you described would be sufficient. :-D > >> >> I'm following up with the rest of the comments, shortly. >> Thanks, >> Konstantine >> >> >> On Wed, Jan 15, 2020 at 9:19 AM Almog Gavra wrote: >> >> > Hi Konstantine, >> > >> > Thanks for the KIP! This is going to make automatic integration with >> > Connect much more powerful. >> > >> > My thoughts are mostly around freshness of the data and being able to >> > expose that to users. Riffing on Randall's timestamp question - have we >> > considered adding some interval at which point a connector will >> republish >> > any topics that it encounters and update the timestamp? That way we have >> > some refreshing mechanism that isn't as powerful as the complete reset >> > (which may not be practical in many scenarios). >> > >> > I also agree with Randall's other point (Would it be better to not >> > automatically reset connector's active topics when a sink connector is >> > restarted?). I think keeping the behavior as symmetrical between sink >> and >> > source connectors is a good idea. >> > >> > Lastly, with regards to the API, I can imagine it is also pretty useful >> to >> > answer the inverse question: "which connectors write to topic X". >> Perhaps >> > we can achieve this by letting the users compute it and just expose an >> API >> > that returns the entire mapping at once (instead of needing to call the >> > /connectors/{name}/topics endpoint for each connector). >> > >> > Otherwise, looks good to me! Hits the requirements that I had in mind on >> > the nose. >> > - Almog >> > >> > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley >> wrote: >> > >> > > Hi Konstantine, >> > > >> > > Thanks for the KIP, I can see how it could be useful. >> > > >> > > a) Did you consider using a metric for this? I don't think it would >> > satisfy >> > > all the use cases you have in mind, but you could mention it in the >> > > rejected alternatives. >> > > >> > > b) If the topic name contains the string "-connector" then the key >> format >> > > is ambiguous. This isn't necessarily fatal because the value will >> > > disambiguate, but it could be misleading. Any reason not to just use a >> > JSON >> > > key, and simplify the value? >> > > >> > > c) I didn't understand this part: "As soon as a worker detects the >> > addition >> > > of a topic to a connector's set of active topics, the worker will >> cease >> > to >> > > post update messages to the status.storage.topic for that connector. >> ". >> > I'm >> > > sure I've overlooking something but why is this necessary? Is this >> were >> > the >> > > task id in the value is used? >> > > >> > > Thanks again, >> > > >> > > Tom >> > > >> > > On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch >> wrote: >> > > >> > > > Oh, one more thing: >> > > > >> > > > 9. There's no mention of how the status topic is partitioned, or how >> > > > partitioning will be used by the new topic records. The KIP should >> > > probably >> > > > outline this for clarity and completeness. >> > > > >> > > > Best regards, >> > > > >> > > > Randall >> > > > >> > > > On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch >> > wrote: >> > > > >> > > > > Thanks, Konstantine. Overall, this KIP looks interesting and >> really >> > > > > useful, and for the most part is spot on. I do have a number of >> > > > > questions/comments about specifics: >> > > > > >> > > > >1. The topic records have a value that includes the connector >> > name, >> > > > >task number that last reported the topic is used, and the topic >> > > name. >> > > > >There's no mention of record timestamps, but I wonder if it'd >> be >> > > > useful to >> > > > >record this. One challenge might be that a connector does not >> > write >> > > > to a >> > > > >topic for a while or the task remains running for long periods >> of >> > > > time and >> > > > >therefore the worker doesn't record that this topic has been >> newly >> > > > written >> > > > >to since it the task was restarted. IOW, the semantics of the >> > > > timestamp may
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
My responses are inline: On Wed, Jan 15, 2020 at 2:05 PM Konstantine Karantasis < konstant...@confluent.io> wrote: > Hi Randall, Tom and Almog. I'm excited to read your comments. I'll reply in > separate emails, in order. > > First, to Randall's comments, I'm replying below with a reference to the > comment number: > > 1. Although I can imagine we'd be interested in adding additional metadata > in the record value, I didn't see the need for a timestamp in this first > draft. > Now that you mention, the way I'd interpret a timestamp in the connector > status record value would be as an approximation of since when this > connector has been using this topic. > Happy to add this if we think this info is useful. Of course, accuracy of > this information depends on message retention in Kafka and on how long the > workers have been running without a restart, so this might make this > approximation less useful if it gets recomputed from time to time. > To your reference in "Recording active topics" I'll reply below, because > that's Tom's question too. > Makes sense that the timestamp in the connector is the (approximate) time that the connector has been using the topic. I do think it's worth adding in the record value (not relying upon Kafka record timestamp). Regarding "message retention", by default Connect creates the status topic with compaction but no deletion policy, which means infinite retention. Don't several things become problematic if finite retention is used on the status topic, or do we need to worry about this for the active topic records. Do we need to periodically rewrite all of the active topic records? If so, we could just write new records using the original timestamp as originally read by the worker. If the worker does periodically (maybe just on task startup) rewrite the active topic records, then we'd have to be sure about the semantics of and interplay with concurrent explicit "reset" calls. > > 2. I'll explain with an example, that maybe is worth adding to the KIP > because what's expected to happen might not be as obvious as I thought when > a new topic is recorded. > Let's say we have two workers, W1 and W2, each running two worker tasks T11 > T12 and T21 T22 respectively associated with a connector C1. All tasks will > run producers that will produce records to the same topic, "test-topic". > When the connector starts, both workers track this connector's set of > active topics as empty. Given the absence of synchronization (that's good) > in how this information is recorded and persisted in the status topic, all > four tasks might race to record status messages: > > For example: > > T11, running at worker W1, will send Kafka records with: > key: topic-test-topic-connector-C1 > value: "topic": { "connector": "some-source", "task": "some-source-TT11", > "name": "test-topic" } > > and T22, running at worker W2, will send Kafka records with: > key: topic-test-topic-connector-C1 > value: "topic": { "connector": "some-source", "task": "some-source-TT22", > "name": "test-topic" } > > (similarly tasks T12 and T21 might send topic status records). > > These four records (they might not even be four but there's going to be at > least one) may be written in any order. Because the topic is compacted and > these records have the same key, eventually only one message will be > retained. > The task ID of that message will be the ID of the task that wrote last. I > can see this being used mostly for troubleshooting. > Thanks for the clarification. Might be good to clarify the language a bit more, though I'm not convinced an example is really needed. > > 3. I believe across the whole KIP, when I'm referring to the task entity, I > imply the worker task. Not the user code that is running as implementation > of the SourceTask or SinkTask abstract classes. Didn't want to increase > complexity by referring to a task as worker task. > But I see your point and I'm going to prefer the terms "worker" and "worker > task" to highlight that it's the framework that is aware of this feature > and not the user code. > Thank you. > > 4. I assumed that absence of changes to the public API would indicate that > these interfaces/abstract classes remain unchanged. But definitely it's > worth to explicitly mention that. > Thanks! > > 5. That is correct. My intention is to make reset work well with the > streaming programming model. Resetting (which btw is not mandatory) means > that you are cleaning the slate for a connector that is currently running, > and its currently active topics will soon be populated from scratch because > new records will be produced or consumed. > But resetting is not required. I see it more like a useful operation, in > case users want to clean the active topics history, without having to > delete a connector, since delete has further implications in the > connector's progress tracking. > I do think it's worth trying to clarify in the document what happens when active
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Hi Tom, here are my replies to your comments: a) Interesting point. It's worth indeed adding it in the "Rejected Alternatives" sections. I did not consider it as an implementation option. Connect has already a REST API that, as you note, seems a more natural choice, especially since its already in place and this feature actually fits well under the connector endpoint namespace. See KIP-495 for an interesting discussion on doubling down to the initial design decision to use the REST API instead of jmx for similar things. b) I thought hard but briefly about this. Although there's potential for ambiguity if someone inspects the key on its own, I believe that if we combine this information with what we save in the value, the ambiguity is removed. With respect to whether a connector can override another connector's key, I don't think that's possible for any combination of unique topic name and connector name. I hope I'm not missing anything. c) That means that worker tasks will produce topic status records once they detect that the worker they are running on does not include a topic that is used by the tasks in the list of active topics for this connector. But once the worker adds this topic to the set of active topics for this connector, then the worker tasks stop producing those messages. They'll produce one again if the worker stops including that topic in the active topics set for some reason. I could improve the wording on the KIP, but I'd also like to know we are on the same page re: the design here. Let me know if the above address your questions. Thanks, Konstantine On Wed, Jan 15, 2020 at 12:05 PM Randall Hauch wrote: > Almog, > > You raise some interesting questions. Comments inline below. > > On Wed, Jan 15, 2020 at 11:19 AM Almog Gavra wrote: > > > Hi Konstantine, > > > > Thanks for the KIP! This is going to make automatic integration with > > Connect much more powerful. > > > > My thoughts are mostly around freshness of the data and being able to > > expose that to users. Riffing on Randall's timestamp question - have we > > considered adding some interval at which point a connector will republish > > any topics that it encounters and update the timestamp? That way we have > > some refreshing mechanism that isn't as powerful as the complete reset > > (which may not be practical in many scenarios). > > > > My question about recording the timestamp at which each active topic record > were (infrequently) written was more about making a bit more information > available given the current design, and whether recording a bit more > information (for very little additional storage cost and no extra runtime > cost) may be worth it if in the future we figure out how to use this > information. > > I think it's more complicated to try to record the history of when topics > were most recently used, since that requires recording a lot more active > topic records than the current proposal. Besides, it's not unexpected that > source and sink connectors sometimes don't use topics for periods of time. > A sink connector only consumes from a topic when there are additional > records to consume, and a source connector only needs to write to a topic > when there is information in the upstream system targeted to that topic. An > example of the latter is that most database connectors will write to a > topic for a particular table only when rows in that table have changed. > > > > I also agree with Randall's other point (Would it be better to not > > automatically reset connector's active topics when a sink connector is > > restarted?). I think keeping the behavior as symmetrical between sink and > > source connectors is a good idea. > > > > Lastly, with regards to the API, I can imagine it is also pretty useful > to > > answer the inverse question: "which connectors write to topic X". Perhaps > > we can achieve this by letting the users compute it and just expose an > API > > that returns the entire mapping at once (instead of needing to call the > > /connectors/{name}/topics endpoint for each connector). > > > It may be worth considering a method such as /topics that would produce a > result that is an aggregate of the potentially many > /connectors/{name}/topic responses, especially if the result of the /topics > is an array of the individual responses from /connectors/{name}/topic. The > benefit of a single request is that the answer to "which connectors use > topic X" can be computed using simple tools such as 'jq'. And, if tooling > frequently fetches the active topic information for all connectors, > providing the aggregate method would reduce the load on the tooling and the > server. It may also be relatively easy to implement. > > > > Otherwise, looks good to me! Hits the requirements that I had in mind on > > the nose. > > - Almog > > > > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley wrote: > > > > > Hi Konstantine, > > > > > > Thanks for the KIP, I can see how it could be useful. > > > > > > a) Did
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Hi Randall, Tom and Almog. I'm excited to read your comments. I'll reply in separate emails, in order. First, to Randall's comments, I'm replying below with a reference to the comment number: 1. Although I can imagine we'd be interested in adding additional metadata in the record value, I didn't see the need for a timestamp in this first draft. Now that you mention, the way I'd interpret a timestamp in the connector status record value would be as an approximation of since when this connector has been using this topic. Happy to add this if we think this info is useful. Of course, accuracy of this information depends on message retention in Kafka and on how long the workers have been running without a restart, so this might make this approximation less useful if it gets recomputed from time to time. To your reference in "Recording active topics" I'll reply below, because that's Tom's question too. 2. I'll explain with an example, that maybe is worth adding to the KIP because what's expected to happen might not be as obvious as I thought when a new topic is recorded. Let's say we have two workers, W1 and W2, each running two worker tasks T11 T12 and T21 T22 respectively associated with a connector C1. All tasks will run producers that will produce records to the same topic, "test-topic". When the connector starts, both workers track this connector's set of active topics as empty. Given the absence of synchronization (that's good) in how this information is recorded and persisted in the status topic, all four tasks might race to record status messages: For example: T11, running at worker W1, will send Kafka records with: key: topic-test-topic-connector-C1 value: "topic": { "connector": "some-source", "task": "some-source-TT11", "name": "test-topic" } and T22, running at worker W2, will send Kafka records with: key: topic-test-topic-connector-C1 value: "topic": { "connector": "some-source", "task": "some-source-TT22", "name": "test-topic" } (similarly tasks T12 and T21 might send topic status records). These four records (they might not even be four but there's going to be at least one) may be written in any order. Because the topic is compacted and these records have the same key, eventually only one message will be retained. The task ID of that message will be the ID of the task that wrote last. I can see this being used mostly for troubleshooting. 3. I believe across the whole KIP, when I'm referring to the task entity, I imply the worker task. Not the user code that is running as implementation of the SourceTask or SinkTask abstract classes. Didn't want to increase complexity by referring to a task as worker task. But I see your point and I'm going to prefer the terms "worker" and "worker task" to highlight that it's the framework that is aware of this feature and not the user code. 4. I assumed that absence of changes to the public API would indicate that these interfaces/abstract classes remain unchanged. But definitely it's worth to explicitly mention that. 5. That is correct. My intention is to make reset work well with the streaming programming model. Resetting (which btw is not mandatory) means that you are cleaning the slate for a connector that is currently running, and its currently active topics will soon be populated from scratch because new records will be produced or consumed. But resetting is not required. I see it more like a useful operation, in case users want to clean the active topics history, without having to delete a connector, since delete has further implications in the connector's progress tracking. 6. I fixed the typo - thanks! I'm very much in favor of preserving symmetry between the two connector types. This has definitely more long term benefits and may help to avoid confusion. However, the asymmetry is inherited here by the asymmetry that exists today between source and sink connectors. Source connector don't list topics in their configurations but sink connectors do. So, if a user reconfigures a sink connector with a different set of topics, if we don't reset the topics based on the new configs (and my thought here was to match the new configuration with the set of active topics), the old topics, currently not listed in the connectors configuration, will keep showing up as active topics. The user will have to explicitly reset the active topics after reconfiguring to avoid this. If there's consensus that preserving this asymmetry is worse than having to reset the active topics, I'm happy to change this in the KIP. 7. What I try to avoid here is the following situation: For some reason (a sequence of failures to write tombstones to the status topic), stale topic status records remain in that topic even after a connector has been deleted. Requiring to restart a connector with the same name just to apply a follow up reset of active topics doesn't seem necessary. I like the idea of decoupling connector existence from the maintenance of the status topic. Of
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Almog, You raise some interesting questions. Comments inline below. On Wed, Jan 15, 2020 at 11:19 AM Almog Gavra wrote: > Hi Konstantine, > > Thanks for the KIP! This is going to make automatic integration with > Connect much more powerful. > > My thoughts are mostly around freshness of the data and being able to > expose that to users. Riffing on Randall's timestamp question - have we > considered adding some interval at which point a connector will republish > any topics that it encounters and update the timestamp? That way we have > some refreshing mechanism that isn't as powerful as the complete reset > (which may not be practical in many scenarios). > My question about recording the timestamp at which each active topic record were (infrequently) written was more about making a bit more information available given the current design, and whether recording a bit more information (for very little additional storage cost and no extra runtime cost) may be worth it if in the future we figure out how to use this information. I think it's more complicated to try to record the history of when topics were most recently used, since that requires recording a lot more active topic records than the current proposal. Besides, it's not unexpected that source and sink connectors sometimes don't use topics for periods of time. A sink connector only consumes from a topic when there are additional records to consume, and a source connector only needs to write to a topic when there is information in the upstream system targeted to that topic. An example of the latter is that most database connectors will write to a topic for a particular table only when rows in that table have changed. > I also agree with Randall's other point (Would it be better to not > automatically reset connector's active topics when a sink connector is > restarted?). I think keeping the behavior as symmetrical between sink and > source connectors is a good idea. > > Lastly, with regards to the API, I can imagine it is also pretty useful to > answer the inverse question: "which connectors write to topic X". Perhaps > we can achieve this by letting the users compute it and just expose an API > that returns the entire mapping at once (instead of needing to call the > /connectors/{name}/topics endpoint for each connector). It may be worth considering a method such as /topics that would produce a result that is an aggregate of the potentially many /connectors/{name}/topic responses, especially if the result of the /topics is an array of the individual responses from /connectors/{name}/topic. The benefit of a single request is that the answer to "which connectors use topic X" can be computed using simple tools such as 'jq'. And, if tooling frequently fetches the active topic information for all connectors, providing the aggregate method would reduce the load on the tooling and the server. It may also be relatively easy to implement. > Otherwise, looks good to me! Hits the requirements that I had in mind on > the nose. > - Almog > > On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley wrote: > > > Hi Konstantine, > > > > Thanks for the KIP, I can see how it could be useful. > > > > a) Did you consider using a metric for this? I don't think it would > satisfy > > all the use cases you have in mind, but you could mention it in the > > rejected alternatives. > > > > b) If the topic name contains the string "-connector" then the key format > > is ambiguous. This isn't necessarily fatal because the value will > > disambiguate, but it could be misleading. Any reason not to just use a > JSON > > key, and simplify the value? > > > > c) I didn't understand this part: "As soon as a worker detects the > addition > > of a topic to a connector's set of active topics, the worker will cease > to > > post update messages to the status.storage.topic for that connector. ". > I'm > > sure I've overlooking something but why is this necessary? Is this were > the > > task id in the value is used? > > > > Thanks again, > > > > Tom > > > > On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch wrote: > > > > > Oh, one more thing: > > > > > > 9. There's no mention of how the status topic is partitioned, or how > > > partitioning will be used by the new topic records. The KIP should > > probably > > > outline this for clarity and completeness. > > > > > > Best regards, > > > > > > Randall > > > > > > On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch > wrote: > > > > > > > Thanks, Konstantine. Overall, this KIP looks interesting and really > > > > useful, and for the most part is spot on. I do have a number of > > > > questions/comments about specifics: > > > > > > > >1. The topic records have a value that includes the connector > name, > > > >task number that last reported the topic is used, and the topic > > name. > > > >There's no mention of record timestamps, but I wonder if it'd be > > > useful to > > > >record this. One challenge might be that a connector does not >
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Hi Konstantine, Thanks for the KIP! This is going to make automatic integration with Connect much more powerful. My thoughts are mostly around freshness of the data and being able to expose that to users. Riffing on Randall's timestamp question - have we considered adding some interval at which point a connector will republish any topics that it encounters and update the timestamp? That way we have some refreshing mechanism that isn't as powerful as the complete reset (which may not be practical in many scenarios). I also agree with Randall's other point (Would it be better to not automatically reset connector's active topics when a sink connector is restarted?). I think keeping the behavior as symmetrical between sink and source connectors is a good idea. Lastly, with regards to the API, I can imagine it is also pretty useful to answer the inverse question: "which connectors write to topic X". Perhaps we can achieve this by letting the users compute it and just expose an API that returns the entire mapping at once (instead of needing to call the /connectors/{name}/topics endpoint for each connector). Otherwise, looks good to me! Hits the requirements that I had in mind on the nose. - Almog On Wed, Jan 15, 2020 at 1:14 AM Tom Bentley wrote: > Hi Konstantine, > > Thanks for the KIP, I can see how it could be useful. > > a) Did you consider using a metric for this? I don't think it would satisfy > all the use cases you have in mind, but you could mention it in the > rejected alternatives. > > b) If the topic name contains the string "-connector" then the key format > is ambiguous. This isn't necessarily fatal because the value will > disambiguate, but it could be misleading. Any reason not to just use a JSON > key, and simplify the value? > > c) I didn't understand this part: "As soon as a worker detects the addition > of a topic to a connector's set of active topics, the worker will cease to > post update messages to the status.storage.topic for that connector. ". I'm > sure I've overlooking something but why is this necessary? Is this were the > task id in the value is used? > > Thanks again, > > Tom > > On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch wrote: > > > Oh, one more thing: > > > > 9. There's no mention of how the status topic is partitioned, or how > > partitioning will be used by the new topic records. The KIP should > probably > > outline this for clarity and completeness. > > > > Best regards, > > > > Randall > > > > On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch wrote: > > > > > Thanks, Konstantine. Overall, this KIP looks interesting and really > > > useful, and for the most part is spot on. I do have a number of > > > questions/comments about specifics: > > > > > >1. The topic records have a value that includes the connector name, > > >task number that last reported the topic is used, and the topic > name. > > >There's no mention of record timestamps, but I wonder if it'd be > > useful to > > >record this. One challenge might be that a connector does not write > > to a > > >topic for a while or the task remains running for long periods of > > time and > > >therefore the worker doesn't record that this topic has been newly > > written > > >to since it the task was restarted. IOW, the semantics of the > > timestamp may > > >be a bit murky. Have you thought about recording the timestamp, and > > if so > > >what are the pros and cons? > > >- The "Recording active topics" section says the following: > > > "As soon as a worker detects the addition of a topic to a > > > connector's set of active topics, all the connector's tasks that > > inspect > > > source or sink records will cease to post update messages to the > > > status.storage.topic." > > > This probably means the timestamp won't be very useful. > > >2. The KIP says "the Kafka record value stores the ID of the task > that > > >succeeded to store a topic status record last." However, this is a > bit > > >unclear: is it really storing the last task that successfully wrote > > to that > > >topic (as this would require very frequent writes to this topic), or > > is it > > >more that this is the task that was last *recorded* as having > written > > >to the topic? (Here, "recorded" could be a bit of a gray area, since > > this > > >would depend on the how the worker periodically records this > > information.) > > >Any kind of clarity here might be helpful. > > >3. In the "Recording active topics" section (and the surrounding > > >sections), the "task" is used ambiguously. For example, "when its > > tasks > > >start processing their first records ... these tasks will start > > inspecting > > >which is the Kafka topic of each of these records". IIUC, the first > > "task" > > >mentioned is the connector's task, and the second is the worker's > > task. Do > > >we need to distinguish this more clearly? > > >4.
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Hi Konstantine, Thanks for the KIP, I can see how it could be useful. a) Did you consider using a metric for this? I don't think it would satisfy all the use cases you have in mind, but you could mention it in the rejected alternatives. b) If the topic name contains the string "-connector" then the key format is ambiguous. This isn't necessarily fatal because the value will disambiguate, but it could be misleading. Any reason not to just use a JSON key, and simplify the value? c) I didn't understand this part: "As soon as a worker detects the addition of a topic to a connector's set of active topics, the worker will cease to post update messages to the status.storage.topic for that connector. ". I'm sure I've overlooking something but why is this necessary? Is this were the task id in the value is used? Thanks again, Tom On Wed, Jan 15, 2020 at 12:15 AM Randall Hauch wrote: > Oh, one more thing: > > 9. There's no mention of how the status topic is partitioned, or how > partitioning will be used by the new topic records. The KIP should probably > outline this for clarity and completeness. > > Best regards, > > Randall > > On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch wrote: > > > Thanks, Konstantine. Overall, this KIP looks interesting and really > > useful, and for the most part is spot on. I do have a number of > > questions/comments about specifics: > > > >1. The topic records have a value that includes the connector name, > >task number that last reported the topic is used, and the topic name. > >There's no mention of record timestamps, but I wonder if it'd be > useful to > >record this. One challenge might be that a connector does not write > to a > >topic for a while or the task remains running for long periods of > time and > >therefore the worker doesn't record that this topic has been newly > written > >to since it the task was restarted. IOW, the semantics of the > timestamp may > >be a bit murky. Have you thought about recording the timestamp, and > if so > >what are the pros and cons? > >- The "Recording active topics" section says the following: > > "As soon as a worker detects the addition of a topic to a > > connector's set of active topics, all the connector's tasks that > inspect > > source or sink records will cease to post update messages to the > > status.storage.topic." > > This probably means the timestamp won't be very useful. > >2. The KIP says "the Kafka record value stores the ID of the task that > >succeeded to store a topic status record last." However, this is a bit > >unclear: is it really storing the last task that successfully wrote > to that > >topic (as this would require very frequent writes to this topic), or > is it > >more that this is the task that was last *recorded* as having written > >to the topic? (Here, "recorded" could be a bit of a gray area, since > this > >would depend on the how the worker periodically records this > information.) > >Any kind of clarity here might be helpful. > >3. In the "Recording active topics" section (and the surrounding > >sections), the "task" is used ambiguously. For example, "when its > tasks > >start processing their first records ... these tasks will start > inspecting > >which is the Kafka topic of each of these records". IIUC, the first > "task" > >mentioned is the connector's task, and the second is the worker's > task. Do > >we need to distinguish this more clearly? > >4. Maybe I missed it, but does this KIP explicitly say that the > >Connector API is unchanged? It's probably worth pointing out to help > >assuage any concerns that connector implementations have to change to > make > >use of this feature. > >5. In the "Resetting a connector's set of active topics" section the > >behavior is not exactly clear. Consider a user running connector "A", > the > >connector has been fully started and is processing records, and the > worker > >has recorded topic usage records. Then the user resets the active > topics > >for connector A while the connector is still running? If the connector > >writes to no new topics, before the tasks are rebalanced then is it > correct > >that Connect would report no active topics? And after the tasks are > >rebalance, will the worker record any topics used by connector A? > >6. In the "Restaring" (misspelled) section: "Reconfiguring a source > >connector has also no altering effect for a source connector. > However, when > >reconfiguring a sink connector if the new configuration no longer > includes > >any of the previously tracked topics, these topics will be removed > from the > >set of active topics for this sink connector by appending tombstone > >messages appropriately after the reconfiguration of the connector." > Would > >it be better to not automatically reset connector's active topics
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Oh, one more thing: 9. There's no mention of how the status topic is partitioned, or how partitioning will be used by the new topic records. The KIP should probably outline this for clarity and completeness. Best regards, Randall On Tue, Jan 14, 2020 at 5:25 PM Randall Hauch wrote: > Thanks, Konstantine. Overall, this KIP looks interesting and really > useful, and for the most part is spot on. I do have a number of > questions/comments about specifics: > >1. The topic records have a value that includes the connector name, >task number that last reported the topic is used, and the topic name. >There's no mention of record timestamps, but I wonder if it'd be useful to >record this. One challenge might be that a connector does not write to a >topic for a while or the task remains running for long periods of time and >therefore the worker doesn't record that this topic has been newly written >to since it the task was restarted. IOW, the semantics of the timestamp may >be a bit murky. Have you thought about recording the timestamp, and if so >what are the pros and cons? >- The "Recording active topics" section says the following: > "As soon as a worker detects the addition of a topic to a > connector's set of active topics, all the connector's tasks that inspect > source or sink records will cease to post update messages to the > status.storage.topic." > This probably means the timestamp won't be very useful. >2. The KIP says "the Kafka record value stores the ID of the task that >succeeded to store a topic status record last." However, this is a bit >unclear: is it really storing the last task that successfully wrote to that >topic (as this would require very frequent writes to this topic), or is it >more that this is the task that was last *recorded* as having written >to the topic? (Here, "recorded" could be a bit of a gray area, since this >would depend on the how the worker periodically records this information.) >Any kind of clarity here might be helpful. >3. In the "Recording active topics" section (and the surrounding >sections), the "task" is used ambiguously. For example, "when its tasks >start processing their first records ... these tasks will start inspecting >which is the Kafka topic of each of these records". IIUC, the first "task" >mentioned is the connector's task, and the second is the worker's task. Do >we need to distinguish this more clearly? >4. Maybe I missed it, but does this KIP explicitly say that the >Connector API is unchanged? It's probably worth pointing out to help >assuage any concerns that connector implementations have to change to make >use of this feature. >5. In the "Resetting a connector's set of active topics" section the >behavior is not exactly clear. Consider a user running connector "A", the >connector has been fully started and is processing records, and the worker >has recorded topic usage records. Then the user resets the active topics >for connector A while the connector is still running? If the connector >writes to no new topics, before the tasks are rebalanced then is it correct >that Connect would report no active topics? And after the tasks are >rebalance, will the worker record any topics used by connector A? >6. In the "Restaring" (misspelled) section: "Reconfiguring a source >connector has also no altering effect for a source connector. However, when >reconfiguring a sink connector if the new configuration no longer includes >any of the previously tracked topics, these topics will be removed from the >set of active topics for this sink connector by appending tombstone >messages appropriately after the reconfiguration of the connector." Would >it be better to not automatically reset connector's active topics when a >sink connector is restarted? Isn't that more consistent with the >"Resetting" behavior and the goals at the top of the KIP: "it'd be useful >for users, operators and applications to know which are the topics that a >connector has used since it was first created"? >7. The `PUT /connectors/{name}/topics/reset` endpoint "this request >can be reapplied after the deletion of the connector". IOW, even though >connector with that name doesn't exist, we can still make this request? How >does this compare with other methods such as "status"? >8. What are the security implications of this proposal? > > As you can see, most of these can probably be addressed without much work. > > Best regards, > > Randall > > On Mon, Jan 13, 2020 at 11:05 PM Konstantine Karantasis < > konstant...@confluent.io> wrote: > >> Hi all. >> >> I just posted KIP-558: Track the set of actively used topics by connectors >> in Kafka Connect >> >> Wiki link here: >> >>
Re: [DISCUSS] KIP-558: Track the set of actively used topics by connectors in Kafka Connect
Thanks, Konstantine. Overall, this KIP looks interesting and really useful, and for the most part is spot on. I do have a number of questions/comments about specifics: 1. The topic records have a value that includes the connector name, task number that last reported the topic is used, and the topic name. There's no mention of record timestamps, but I wonder if it'd be useful to record this. One challenge might be that a connector does not write to a topic for a while or the task remains running for long periods of time and therefore the worker doesn't record that this topic has been newly written to since it the task was restarted. IOW, the semantics of the timestamp may be a bit murky. Have you thought about recording the timestamp, and if so what are the pros and cons? - The "Recording active topics" section says the following: "As soon as a worker detects the addition of a topic to a connector's set of active topics, all the connector's tasks that inspect source or sink records will cease to post update messages to the status.storage.topic." This probably means the timestamp won't be very useful. 2. The KIP says "the Kafka record value stores the ID of the task that succeeded to store a topic status record last." However, this is a bit unclear: is it really storing the last task that successfully wrote to that topic (as this would require very frequent writes to this topic), or is it more that this is the task that was last *recorded* as having written to the topic? (Here, "recorded" could be a bit of a gray area, since this would depend on the how the worker periodically records this information.) Any kind of clarity here might be helpful. 3. In the "Recording active topics" section (and the surrounding sections), the "task" is used ambiguously. For example, "when its tasks start processing their first records ... these tasks will start inspecting which is the Kafka topic of each of these records". IIUC, the first "task" mentioned is the connector's task, and the second is the worker's task. Do we need to distinguish this more clearly? 4. Maybe I missed it, but does this KIP explicitly say that the Connector API is unchanged? It's probably worth pointing out to help assuage any concerns that connector implementations have to change to make use of this feature. 5. In the "Resetting a connector's set of active topics" section the behavior is not exactly clear. Consider a user running connector "A", the connector has been fully started and is processing records, and the worker has recorded topic usage records. Then the user resets the active topics for connector A while the connector is still running? If the connector writes to no new topics, before the tasks are rebalanced then is it correct that Connect would report no active topics? And after the tasks are rebalance, will the worker record any topics used by connector A? 6. In the "Restaring" (misspelled) section: "Reconfiguring a source connector has also no altering effect for a source connector. However, when reconfiguring a sink connector if the new configuration no longer includes any of the previously tracked topics, these topics will be removed from the set of active topics for this sink connector by appending tombstone messages appropriately after the reconfiguration of the connector." Would it be better to not automatically reset connector's active topics when a sink connector is restarted? Isn't that more consistent with the "Resetting" behavior and the goals at the top of the KIP: "it'd be useful for users, operators and applications to know which are the topics that a connector has used since it was first created"? 7. The `PUT /connectors/{name}/topics/reset` endpoint "this request can be reapplied after the deletion of the connector". IOW, even though connector with that name doesn't exist, we can still make this request? How does this compare with other methods such as "status"? 8. What are the security implications of this proposal? As you can see, most of these can probably be addressed without much work. Best regards, Randall On Mon, Jan 13, 2020 at 11:05 PM Konstantine Karantasis < konstant...@confluent.io> wrote: > Hi all. > > I just posted KIP-558: Track the set of actively used topics by connectors > in Kafka Connect > > Wiki link here: > > https://cwiki.apache.org/confluence/display/KAFKA/KIP-558%3A+Track+the+set+of+actively+used+topics+by+connectors+in+Kafka+Connect > > I think it's a nice extension to follow up on KIP-158 and a useful feature > to the ever increasing number of applications that are built around Kafka > Connect. > Would love to hear what you think. > > Best, > Konstantine >