[ https://issues.apache.org/jira/browse/KAFKA-15113?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17736177#comment-17736177 ]
Chris Egerton edited comment on KAFKA-15113 at 6/22/23 3:35 PM: ---------------------------------------------------------------- [~yash.mayya] Thanks for filing this! One thing I'm struggling with is that, as far as I've been able to tell, there's no realistic use case for setting different bootstrap servers for, e.g., a sink connector's consumer and admin client, or a source connector's producer and admin client, or a sink connector's consumer and producer (which can be set up if the connector uses a DLQ topic). If we don't want to support this use case, I think our effort might be better spent trying to make it easier for users to do the right thing instead of trying to gracefully recover when they've done the wrong thing. One idea I've been toying with is adding support for declaring single properties in connector configurations that affect all Kafka clients spun up by the Connect runtime. For example, instead of specifying {{{}consumer.override.bootstrap.servers{}}}, {{{}producer.override.bootstrap.servers{}}}, and {{admin.override.bootstrap.servers}} in a connector config, we could allow users to simply declare {{{}kafka.clients.override.bootstrap.servers{}}}. If we wanted to get fancier about this and avoid some of the compatibility constraints of adding framework-level properties to connector configurations (which always run the risk of conflicting with connector-defined properties), we might even expand the structure of connector configurations by separating configs that apply to the connector from the ones that apply to its runtime-constructed Kafka clients, its key/value/header converters, etc. That could look something like this (assuming the request is issued against the {{POST /connectors}} endpoint): { {{ "name": "reddit-source",}} {{ "connector.config": {}} {{ "connector.class": "RedditSource",}} {{ "tasks.max": "1",}} {{ "posts.subreddits": "CatsStandingUp",}} {{ "posts.topic": "reddit"}} {{ },}} {{ "kafka.clients.config": {}} {{ "bootstrap.servers": "localhost:9093",}} {{ "security.protocol": "PLAINTEXT"}} {{ },}} {{ "producer.config": {}} {{ "buffer.memory": "4194304"}} } } Both of these would come with the advantage that, if users start actually using the feature, it'd be harder to screw up connector configurations. Of course, we would still have to decide if/how to handle misconfiguration by the user, but it might allow us to pursue more opinionated options, like failing requests (and even rejecting connector configurations), which IMO is a fine option as long as we provide a clear error message with easy-to-follow instructions on how to correct the connector configuration. TL;DR: We should start rejecting connector configurations (and failing offset modification requests for connectors) that have mismatched bootstrap servers across Kafka clients, but we should also make it easier for users to correctly configure a connector with overridden client bootstrap servers, which will almost certainly require a KIP. was (Author: chrisegerton): [~yash.mayya] Thanks for filing this! One thing I'm struggling with is that, as far as I've been able to tell, there's no realistic use case for setting different bootstrap servers for, e.g., a sink connector's consumer and admin client, or a source connector's producer and admin client, or a sink connector's consumer and producer (which can be set up if the connector uses a DLQ topic). If we don't want to support this use case, I think our effort might be better spent trying to make it easier for users to do the right thing instead of trying to gracefully recover when they've done the wrong thing. One idea I've been toying with is adding support for declaring single properties in connector configurations that affect all Kafka clients spun up by the Connect runtime. For example, instead of specifying {{{}consumer.override.bootstrap.servers{}}}, {{{}producer.override.bootstrap.servers{}}}, and {{admin.override.bootstrap.servers}} in a connector config, we could allow users to simply declare {{{}kafka.clients.override.bootstrap.servers{}}}. If we wanted to get fancier about this and avoid some of the compatibility constraints of adding framework-level properties to connector configurations (which always run the risk of conflicting with connector-defined properties), we might even expand the structure of connector configurations by separating configs that apply to the connector from the ones that apply to its runtime-constructed Kafka clients, its key/value/header converters, etc. That could look something like this (assuming the request is issued against the {{POST /connectors}} endpoint): {{{}} {{ "name": "reddit-source",}} {{ "connector.config": {}} {{ "connector.class": "RedditSource",}} {{ "tasks.max": "1",}} {{ "posts.subreddits": "CatsStandingUp",}} {{ "posts.topic": "reddit"}} {{ },}} {{ "kafka.clients.config": {}} {{ "bootstrap.servers": "localhost:9093",}} {{ "security.protocol": "PLAINTEXT"}} {{ },}} {{ "producer.config": {}} {{ "buffer.memory": "4194304"}} {{ }}} {{}}} Both of these would come with the advantage that, if users start actually using the feature, it'd be harder to screw up connector configurations. Of course, we would still have to decide if/how to handle misconfiguration by the user, but it might allow us to pursue more opinionated options, like failing requests (and even rejecting connector configurations), which IMO is a fine option as long as we provide a clear error message with easy-to-follow instructions on how to correct the connector configuration. TL;DR: We should start rejecting connector configurations (and failing offset modification requests for connectors) that have mismatched bootstrap servers across Kafka clients, but we should also make it easier for users to correctly configure a connector with overridden client bootstrap servers, which will almost certainly require a KIP. > Gracefully handle cases where a sink connector's admin and consumer client > config overrides target different Kafka clusters > --------------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-15113 > URL: https://issues.apache.org/jira/browse/KAFKA-15113 > Project: Kafka > Issue Type: Task > Components: KafkaConnect > Reporter: Yash Mayya > Priority: Minor > > Background reading - > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy] > > * > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-875%3A+First-class+offsets+support+in+Kafka+Connect] > > > From [https://github.com/apache/kafka/pull/13434#discussion_r1144415671] - > {quote}Currently, admin clients are only instantiated for sink connectors to > create the DLQ topic if required. So it seems like it could be technically > possible for a sink connector's consumer client overrides to target a > different Kafka cluster from its producer and admin client overrides. Such a > setup won't work with this implementation of the get offsets API as it is > using an admin client to get a sink connector's consumer group offsets. > However, I'm not sure we want to use a consumer client to retrieve the > offsets either as we shouldn't be disrupting the existing sink tasks' > consumer group just to fetch offsets. Leveraging a sink task's consumer also > isn't an option because fetching offsets for a stopped sink connector (where > all the tasks will be stopped) should be allowed. I'm wondering if we should > document that a connector's various client config override policies shouldn't > target different Kafka clusters (side note - looks like we don't [currently > document|https://kafka.apache.org/documentation/#connect] client config > overrides for Connect beyond just the worker property > {{{}connector.client.config.override.policy{}}}). > {quote} > > {quote}I don't think we need to worry too much about this. I cannot imagine a > sane use case that involves overriding a connector's Kafka clients with > different Kafka clusters (not just bootstrap servers, but actually different > clusters) for producer/consumer/admin. I'd be fine with adding a note to our > docs that that kind of setup isn't supported but I really, really hope that > it's not necessary and nobody's trying to do that in the first place. I also > suspect that there are other places where this might cause issues, like with > exactly-once source support or automatic topic creation for source connectors. > That said, there is a different case we may want to consider: someone may > have configured consumer overrides for a sink connector, but not admin > overrides. This may happen if they don't use a DLQ topic. I don't know if we > absolutely need to handle this now and we may consider filing a follow-up > ticket to look into this, but one quick-and-dirty thought I've had is to > configure the admin client used here with a combination of the configurations > for the connector's admin client and its consumer, giving precedent to the > latter. > {quote} > > Also from [https://github.com/apache/kafka/pull/13818#discussion_r1224138055] > - > {quote}We will have undesirable behavior if the connector is targeting a > Kafka cluster different from the Connect cluster's backing Kafka cluster and > the user has configured the consumer overrides appropriately for their > connector, but not the admin overrides (something we also discussed > previously > [here|https://github.com/apache/kafka/pull/13434#discussion_r1144415671]). > In the above case, if a user attempts to reset their sink connector's offsets > via the {{DELETE /connectors/\{connector}/offsets}} endpoint, the following > will occur: > # We list the consumer group offsets via {{Admin::listConsumerGroupOffsets}} > which returns an empty partition offsets map for the sink connector's > consumer group ID (it exists on a different Kafka cluster to the one that the > admin client is connecting to). > # We call {{SinkConnector::alterOffsets}} with an empty offsets map which > could cause the sink connector to propagate the offsets reset related changes > to the sink system. > # We attempt to delete the consumer group via > {{Admin::deleteConsumerGroups}} which returns {{GroupIdNotFoundException}} > which we essentially swallow in order to keep offsets reset operations > idempotent and return a success message to the user (even though the real > consumer group for the sink connector on the other Kafka cluster hasn't been > deleted). > This will occur if the connector's admin overrides are missing OR the admin > overrides are deliberately configured to target a Kafka cluster different > from the consumer overrides (although like you pointed out in the other > linked thread, this doesn't seem like a valid use case that we'd even want to > support). > I guess we'd want to pursue the approach you suggested where we'd configure > the admin client with a combination of the connector's admin overrides and > consumer overrides? > Another option could potentially be to somehow verify that the > {{admin.override.bootstrap.servers}} in the connector's config / > {{admin.bootstrap.servers}} in the worker config / {{bootstrap.servers}} in > the worker config (in order of preference) correspond to the same Kafka > cluster as {{consumer.override.bootstrap.servers}} in the connector's config > / {{consumer.bootstrap.servers}} in the worker config / {{bootstrap.servers}} > in the worker config (in order of preference) and fail the request if we are > able to reliably determine that they aren't pointing to the same Kafka > clusters? I'm not sure that this is a feasible approach however. > Yet another option could be to remove the idempotency guarantee from the > {{DELETE /connectors/\{connector}/offsets}} endpoint and if we encounter a > {{GroupIdNotFoundException}} from {{{}Admin::deleteConsumerGroups{}}}, return > an error message to the user indicating that either the offsets have already > been reset previously or else they might need to check their connector's > admin overrides (this does seem fairly ugly though). > Edit: A more elegant way might be to switch the offset reset mechanism from > deleting the consumer group to deleting the offsets for all topic partitions > via {{Admin::deleteConsumerGroupOffsets}} (similar to what we do for the > {{PATCH /connectors/\{connector}/offsets}} endpoint when the offset for a > partition is specified as {{{}null{}}}). This way we could explicitly check > for existence of the sink connector's consumer group prior to listing its > offsets and fail requests if the consumer group doesn't exist (the minor > down-side is that this will require an additional admin client request). > {quote} -- This message was sent by Atlassian Jira (v8.20.10#820010)