[ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton resolved KAFKA-16837.
-----------------------------------
    Fix Version/s: 3.9.0
       Resolution: Fixed

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> ------------------------------------------------------------------------------------
>
>                 Key: KAFKA-16837
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16837
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect
>    Affects Versions: 3.5.1, 3.6.1, 3.8.0
>            Reporter: Sergey Ivanov
>            Assignee: Chris Egerton
>            Priority: Major
>             Fix For: 3.9.0
>
>         Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info&expand=status
> ...
>     "status": {
>       "name": "local-file-sink",
>       "connector": {
>         "state": "RUNNING",
>         "worker_id": "10.10.10.10:8083"
>       },
>       "tasks": [
>         {
>           "id": 0,
>           "state": "RUNNING",
>           "worker_id": "10.10.10.10:8083"
>         }
>       ],
>       "type": "sink"
>     }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
>     "info": {
>       "name": "local-file-sink",
>       "config": {
>         "connector.class": "FileStreamSink",
>         "file": "/opt/kafka/test.sink.txt",
>         "tasks.max": "1",
>         "topics": "${file:/opt/kafka/provider2.properties:topics}",
>         "name": "local-file-sink"
>       },
>       "tasks": [
>         {
>           "connector": "local-file-sink",
>           "task": 0
>         }
>       ],
>       "type": "sink"
>     },
>     "status": {
>       "name": "local-file-sink",
>       "connector": {
>         "state": "RUNNING",
>         "worker_id": "10.10.10.10:8083"
>       },
>       "tasks": [
>         {
>           "id": 0,
>           "state": "FAILED",
>           "worker_id": "10.10.10.10:8083",
>           "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
>         }
>       ],
>       "type": "sink"
>     }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
>  at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
>  at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
>  at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
>  at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
>  at java.base/java.lang.Thread.run(Thread.java:840)  {code}
> As I understand it happens, because on the connector update AbstractHerder 
> tries to update current tasks:
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1051]
> and before do it, herder {+}tries to compare old task config and new one{+}. 
> But it doesn't compare original values, +it tries to get ConfigProvider 
> calculated value for previous task+ and failed as not possible to get file 
> for previous task, by ConfigProvider.
> The main question *do we really need to compare ConfigProvider calculated* 
> values there instead of comparing original configs? 
> Now it leads to issues as lot of ConfigProviders usually raise Exception if 
> resource not found.
>  
> As WA we can remove and create connector, instead of update. But there is one 
> case when it doesn't help: KAFKA-16838



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to