[
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Chris Egerton resolved KAFKA-16837.
-----------------------------------
Fix Version/s: 3.9.0
Resolution: Fixed
> Kafka Connect fails on update connector for incorrect previous Config
> Provider tasks
> ------------------------------------------------------------------------------------
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
> Issue Type: Bug
> Components: connect
> Affects Versions: 3.5.1, 3.6.1, 3.8.0
> Reporter: Sergey Ivanov
> Assignee: Chris Egerton
> Priority: Major
> Fix For: 3.9.0
>
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the
> *previous* task contains ConfigProvider's value with incorrect value that
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but
> actually any ConfigProvider is acceptable that could raise exception if
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
> "connector.class": "FileStreamSink",
> "tasks.max": "1",
> "file": "/opt/kafka/test.sink.txt",
> "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info&expand=status
> ...
> "status": {
> "name": "local-file-sink",
> "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
> },
> "tasks": [
> {
> "id": 0,
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
> }
> ],
> "type": "sink"
> }
> }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
> "connector.class": "FileStreamSink",
> "tasks.max": "1",
> "file": "/opt/kafka/test.sink.txt",
> "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200.
> 7. Checks that everything works fine:
> {code:java}
> {
> "local-file-sink": {
> "info": {
> "name": "local-file-sink",
> "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
> },
> "tasks": [
> {
> "connector": "local-file-sink",
> "task": 0
> }
> ],
> "type": "sink"
> },
> "status": {
> "name": "local-file-sink",
> "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
> },
> "tasks": [
> {
> "id": 0,
> "state": "FAILED",
> "worker_id": "10.10.10.10:8083",
> "trace": "org.apache.kafka.common.errors.InvalidTopicException:
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
> ],
> "type": "sink"
> }
> }
> }
> {code}
> Config has been updated, but new task has not been created. And as result
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id=
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
> [Worker clientId=connect-1, groupId=streaming-service_streaming_service]
> Failed to reconfigure connector's tasks (local-file-sink), retrying after
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties
> from file /opt/kafka/provider.properties
> at
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
> at
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
> at
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
> at
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
> at
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
> at
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
> at
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
> at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
> at
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
> at java.base/java.lang.Thread.run(Thread.java:840) {code}
> As I understand it happens, because on the connector update AbstractHerder
> tries to update current tasks:
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1051]
> and before do it, herder {+}tries to compare old task config and new one{+}.
> But it doesn't compare original values, +it tries to get ConfigProvider
> calculated value for previous task+ and failed as not possible to get file
> for previous task, by ConfigProvider.
> The main question *do we really need to compare ConfigProvider calculated*
> values there instead of comparing original configs?
> Now it leads to issues as lot of ConfigProviders usually raise Exception if
> resource not found.
>
> As WA we can remove and create connector, instead of update. But there is one
> case when it doesn't help: KAFKA-16838
--
This message was sent by Atlassian Jira
(v8.20.10#820010)