[ https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Chris Egerton resolved KAFKA-16837. ----------------------------------- Fix Version/s: 3.9.0 Resolution: Fixed > Kafka Connect fails on update connector for incorrect previous Config > Provider tasks > ------------------------------------------------------------------------------------ > > Key: KAFKA-16837 > URL: https://issues.apache.org/jira/browse/KAFKA-16837 > Project: Kafka > Issue Type: Bug > Components: connect > Affects Versions: 3.5.1, 3.6.1, 3.8.0 > Reporter: Sergey Ivanov > Assignee: Chris Egerton > Priority: Major > Fix For: 3.9.0 > > Attachments: kafka_connect_config.png > > > Hello, > We faced an issue when is not possible to update Connector config if the > *previous* task contains ConfigProvider's value with incorrect value that > leads to ConfigException. > I can provide simple Test Case to reproduce it with FileConfigProvider, but > actually any ConfigProvider is acceptable that could raise exception if > something wrong with config (like resource doesn't exist). > *Prerequisites:* > Kafka Connect instance with config providers: > > {code:java} > config.providers=file > config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code} > > 1. Create Kafka topic "test" > 2. On the Kafka Connect instance create the file > "/opt/kafka/provider.properties" with content > {code:java} > topics=test > {code} > 3. Create simple FileSink connector: > {code:java} > PUT /connectors/local-file-sink/config > { > "connector.class": "FileStreamSink", > "tasks.max": "1", > "file": "/opt/kafka/test.sink.txt", > "topics": "${file:/opt/kafka/provider.properties:topics}" > } > {code} > 4. Checks that everything works fine: > {code:java} > GET /connectors?expand=info&expand=status > ... > "status": { > "name": "local-file-sink", > "connector": { > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > }, > "tasks": [ > { > "id": 0, > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > } > ], > "type": "sink" > } > } > } > {code} > Looks fine. > 5. Renames the file to "/opt/kafka/provider2.properties". > 6. Update connector with new correct file name: > {code:java} > PUT /connectors/local-file-sink/config > { > "connector.class": "FileStreamSink", > "tasks.max": "1", > "file": "/opt/kafka/test.sink.txt", > "topics": "${file:/opt/kafka/provider2.properties:topics}" > } > {code} > Update {*}succeed{*}, got 200. > 7. Checks that everything works fine: > {code:java} > { > "local-file-sink": { > "info": { > "name": "local-file-sink", > "config": { > "connector.class": "FileStreamSink", > "file": "/opt/kafka/test.sink.txt", > "tasks.max": "1", > "topics": "${file:/opt/kafka/provider2.properties:topics}", > "name": "local-file-sink" > }, > "tasks": [ > { > "connector": "local-file-sink", > "task": 0 > } > ], > "type": "sink" > }, > "status": { > "name": "local-file-sink", > "connector": { > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > }, > "tasks": [ > { > "id": 0, > "state": "FAILED", > "worker_id": "10.10.10.10:8083", > "trace": "org.apache.kafka.common.errors.InvalidTopicException: > Invalid topics: [${file:/opt/kafka/provider.properties:topics}]" > } > ], > "type": "sink" > } > } > } > {code} > Config has been updated, but new task has not been created. And as result > connector doesn't work. > It failed on: > {code:java} > [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= > ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44] > [Worker clientId=connect-1, groupId=streaming-service_streaming_service] > Failed to reconfigure connector's tasks (local-file-sink), retrying after > backoff. > org.apache.kafka.common.config.ConfigException: Could not read properties > from file /opt/kafka/provider.properties > at > org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98) > at > org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58) > at > org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181) > at > org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470) > at > org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371) > at > java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) > at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) > at > java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) > at > java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) > at java.base/java.lang.Thread.run(Thread.java:840) {code} > As I understand it happens, because on the connector update AbstractHerder > tries to update current tasks: > [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1051] > and before do it, herder {+}tries to compare old task config and new one{+}. > But it doesn't compare original values, +it tries to get ConfigProvider > calculated value for previous task+ and failed as not possible to get file > for previous task, by ConfigProvider. > The main question *do we really need to compare ConfigProvider calculated* > values there instead of comparing original configs? > Now it leads to issues as lot of ConfigProviders usually raise Exception if > resource not found. > > As WA we can remove and create connector, instead of update. But there is one > case when it doesn't help: KAFKA-16838 -- This message was sent by Atlassian Jira (v8.20.10#820010)