[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks
[ https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-16837: -- Fix Version/s: (was: 3.9.0) > Kafka Connect fails on update connector for incorrect previous Config > Provider tasks > > > Key: KAFKA-16837 > URL: https://issues.apache.org/jira/browse/KAFKA-16837 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.5.1, 3.6.1, 3.8.0 >Reporter: Sergey Ivanov >Assignee: Chris Egerton >Priority: Major > Fix For: 3.8.0, 3.7.1 > > Attachments: kafka_connect_config.png > > > Hello, > We faced an issue when is not possible to update Connector config if the > *previous* task contains ConfigProvider's value with incorrect value that > leads to ConfigException. > I can provide simple Test Case to reproduce it with FileConfigProvider, but > actually any ConfigProvider is acceptable that could raise exception if > something wrong with config (like resource doesn't exist). > *Prerequisites:* > Kafka Connect instance with config providers: > > {code:java} > config.providers=file > config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code} > > 1. Create Kafka topic "test" > 2. On the Kafka Connect instance create the file > "/opt/kafka/provider.properties" with content > {code:java} > topics=test > {code} > 3. Create simple FileSink connector: > {code:java} > PUT /connectors/local-file-sink/config > { > "connector.class": "FileStreamSink", > "tasks.max": "1", > "file": "/opt/kafka/test.sink.txt", > "topics": "${file:/opt/kafka/provider.properties:topics}" > } > {code} > 4. Checks that everything works fine: > {code:java} > GET /connectors?expand=info=status > ... > "status": { > "name": "local-file-sink", > "connector": { > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > }, > "tasks": [ > { > "id": 0, > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > } > ], > "type": "sink" > } > } > } > {code} > Looks fine. > 5. Renames the file to "/opt/kafka/provider2.properties". > 6. Update connector with new correct file name: > {code:java} > PUT /connectors/local-file-sink/config > { > "connector.class": "FileStreamSink", > "tasks.max": "1", > "file": "/opt/kafka/test.sink.txt", > "topics": "${file:/opt/kafka/provider2.properties:topics}" > } > {code} > Update {*}succeed{*}, got 200. > 7. Checks that everything works fine: > {code:java} > { > "local-file-sink": { > "info": { > "name": "local-file-sink", > "config": { > "connector.class": "FileStreamSink", > "file": "/opt/kafka/test.sink.txt", > "tasks.max": "1", > "topics": "${file:/opt/kafka/provider2.properties:topics}", > "name": "local-file-sink" > }, > "tasks": [ > { > "connector": "local-file-sink", > "task": 0 > } > ], > "type": "sink" > }, > "status": { > "name": "local-file-sink", > "connector": { > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > }, > "tasks": [ > { > "id": 0, > "state": "FAILED", > "worker_id": "10.10.10.10:8083", > "trace": "org.apache.kafka.common.errors.InvalidTopicException: > Invalid topics: [${file:/opt/kafka/provider.properties:topics}]" > } > ], > "type": "sink" > } > } > } > {code} > Config has been updated, but new task has not been created. And as result > connector doesn't work. > It failed on: > {code:java} > [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= > ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44] > [Worker clientId=connect-1, groupId=streaming-service_streaming_service] > Failed to reconfigure connector's tasks (local-file-sink), retrying after > backoff. > org.apache.kafka.common.config.ConfigException: Could not read properties > from file /opt/kafka/provider.properties > at > org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98) > at > org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58) > at > org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181) > at > org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804) > at >
[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks
[ https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-16837: -- Fix Version/s: 3.7.1 > Kafka Connect fails on update connector for incorrect previous Config > Provider tasks > > > Key: KAFKA-16837 > URL: https://issues.apache.org/jira/browse/KAFKA-16837 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.5.1, 3.6.1, 3.8.0 >Reporter: Sergey Ivanov >Assignee: Chris Egerton >Priority: Major > Fix For: 3.8.0, 3.7.1, 3.9.0 > > Attachments: kafka_connect_config.png > > > Hello, > We faced an issue when is not possible to update Connector config if the > *previous* task contains ConfigProvider's value with incorrect value that > leads to ConfigException. > I can provide simple Test Case to reproduce it with FileConfigProvider, but > actually any ConfigProvider is acceptable that could raise exception if > something wrong with config (like resource doesn't exist). > *Prerequisites:* > Kafka Connect instance with config providers: > > {code:java} > config.providers=file > config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code} > > 1. Create Kafka topic "test" > 2. On the Kafka Connect instance create the file > "/opt/kafka/provider.properties" with content > {code:java} > topics=test > {code} > 3. Create simple FileSink connector: > {code:java} > PUT /connectors/local-file-sink/config > { > "connector.class": "FileStreamSink", > "tasks.max": "1", > "file": "/opt/kafka/test.sink.txt", > "topics": "${file:/opt/kafka/provider.properties:topics}" > } > {code} > 4. Checks that everything works fine: > {code:java} > GET /connectors?expand=info=status > ... > "status": { > "name": "local-file-sink", > "connector": { > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > }, > "tasks": [ > { > "id": 0, > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > } > ], > "type": "sink" > } > } > } > {code} > Looks fine. > 5. Renames the file to "/opt/kafka/provider2.properties". > 6. Update connector with new correct file name: > {code:java} > PUT /connectors/local-file-sink/config > { > "connector.class": "FileStreamSink", > "tasks.max": "1", > "file": "/opt/kafka/test.sink.txt", > "topics": "${file:/opt/kafka/provider2.properties:topics}" > } > {code} > Update {*}succeed{*}, got 200. > 7. Checks that everything works fine: > {code:java} > { > "local-file-sink": { > "info": { > "name": "local-file-sink", > "config": { > "connector.class": "FileStreamSink", > "file": "/opt/kafka/test.sink.txt", > "tasks.max": "1", > "topics": "${file:/opt/kafka/provider2.properties:topics}", > "name": "local-file-sink" > }, > "tasks": [ > { > "connector": "local-file-sink", > "task": 0 > } > ], > "type": "sink" > }, > "status": { > "name": "local-file-sink", > "connector": { > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > }, > "tasks": [ > { > "id": 0, > "state": "FAILED", > "worker_id": "10.10.10.10:8083", > "trace": "org.apache.kafka.common.errors.InvalidTopicException: > Invalid topics: [${file:/opt/kafka/provider.properties:topics}]" > } > ], > "type": "sink" > } > } > } > {code} > Config has been updated, but new task has not been created. And as result > connector doesn't work. > It failed on: > {code:java} > [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= > ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44] > [Worker clientId=connect-1, groupId=streaming-service_streaming_service] > Failed to reconfigure connector's tasks (local-file-sink), retrying after > backoff. > org.apache.kafka.common.config.ConfigException: Could not read properties > from file /opt/kafka/provider.properties > at > org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98) > at > org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58) > at > org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181) > at > org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804) > at >
[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks
[ https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-16837: -- Fix Version/s: 3.8.0 > Kafka Connect fails on update connector for incorrect previous Config > Provider tasks > > > Key: KAFKA-16837 > URL: https://issues.apache.org/jira/browse/KAFKA-16837 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.5.1, 3.6.1, 3.8.0 >Reporter: Sergey Ivanov >Assignee: Chris Egerton >Priority: Major > Fix For: 3.8.0, 3.9.0 > > Attachments: kafka_connect_config.png > > > Hello, > We faced an issue when is not possible to update Connector config if the > *previous* task contains ConfigProvider's value with incorrect value that > leads to ConfigException. > I can provide simple Test Case to reproduce it with FileConfigProvider, but > actually any ConfigProvider is acceptable that could raise exception if > something wrong with config (like resource doesn't exist). > *Prerequisites:* > Kafka Connect instance with config providers: > > {code:java} > config.providers=file > config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code} > > 1. Create Kafka topic "test" > 2. On the Kafka Connect instance create the file > "/opt/kafka/provider.properties" with content > {code:java} > topics=test > {code} > 3. Create simple FileSink connector: > {code:java} > PUT /connectors/local-file-sink/config > { > "connector.class": "FileStreamSink", > "tasks.max": "1", > "file": "/opt/kafka/test.sink.txt", > "topics": "${file:/opt/kafka/provider.properties:topics}" > } > {code} > 4. Checks that everything works fine: > {code:java} > GET /connectors?expand=info=status > ... > "status": { > "name": "local-file-sink", > "connector": { > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > }, > "tasks": [ > { > "id": 0, > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > } > ], > "type": "sink" > } > } > } > {code} > Looks fine. > 5. Renames the file to "/opt/kafka/provider2.properties". > 6. Update connector with new correct file name: > {code:java} > PUT /connectors/local-file-sink/config > { > "connector.class": "FileStreamSink", > "tasks.max": "1", > "file": "/opt/kafka/test.sink.txt", > "topics": "${file:/opt/kafka/provider2.properties:topics}" > } > {code} > Update {*}succeed{*}, got 200. > 7. Checks that everything works fine: > {code:java} > { > "local-file-sink": { > "info": { > "name": "local-file-sink", > "config": { > "connector.class": "FileStreamSink", > "file": "/opt/kafka/test.sink.txt", > "tasks.max": "1", > "topics": "${file:/opt/kafka/provider2.properties:topics}", > "name": "local-file-sink" > }, > "tasks": [ > { > "connector": "local-file-sink", > "task": 0 > } > ], > "type": "sink" > }, > "status": { > "name": "local-file-sink", > "connector": { > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > }, > "tasks": [ > { > "id": 0, > "state": "FAILED", > "worker_id": "10.10.10.10:8083", > "trace": "org.apache.kafka.common.errors.InvalidTopicException: > Invalid topics: [${file:/opt/kafka/provider.properties:topics}]" > } > ], > "type": "sink" > } > } > } > {code} > Config has been updated, but new task has not been created. And as result > connector doesn't work. > It failed on: > {code:java} > [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= > ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44] > [Worker clientId=connect-1, groupId=streaming-service_streaming_service] > Failed to reconfigure connector's tasks (local-file-sink), retrying after > backoff. > org.apache.kafka.common.config.ConfigException: Could not read properties > from file /opt/kafka/provider.properties > at > org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98) > at > org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58) > at > org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181) > at > org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804) > at >
[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks
[ https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Ivanov updated KAFKA-16837: -- Description: Hello, We faced an issue when is not possible to update Connector config if the *previous* task contains ConfigProvider's value with incorrect value that leads to ConfigException. I can provide simple Test Case to reproduce it with FileConfigProvider, but actually any ConfigProvider is acceptable that could raise exception if something wrong with config (like resource doesn't exist). *Prerequisites:* Kafka Connect instance with config providers: {code:java} config.providers=file config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code} 1. Create Kafka topic "test" 2. On the Kafka Connect instance create the file "/opt/kafka/provider.properties" with content {code:java} topics=test {code} 3. Create simple FileSink connector: {code:java} PUT /connectors/local-file-sink/config { "connector.class": "FileStreamSink", "tasks.max": "1", "file": "/opt/kafka/test.sink.txt", "topics": "${file:/opt/kafka/provider.properties:topics}" } {code} 4. Checks that everything works fine: {code:java} GET /connectors?expand=info=status ... "status": { "name": "local-file-sink", "connector": { "state": "RUNNING", "worker_id": "10.10.10.10:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.10.10.10:8083" } ], "type": "sink" } } } {code} Looks fine. 5. Renames the file to "/opt/kafka/provider2.properties". 6. Update connector with new correct file name: {code:java} PUT /connectors/local-file-sink/config { "connector.class": "FileStreamSink", "tasks.max": "1", "file": "/opt/kafka/test.sink.txt", "topics": "${file:/opt/kafka/provider2.properties:topics}" } {code} Update {*}succeed{*}, got 200. 7. Checks that everything works fine: {code:java} { "local-file-sink": { "info": { "name": "local-file-sink", "config": { "connector.class": "FileStreamSink", "file": "/opt/kafka/test.sink.txt", "tasks.max": "1", "topics": "${file:/opt/kafka/provider2.properties:topics}", "name": "local-file-sink" }, "tasks": [ { "connector": "local-file-sink", "task": 0 } ], "type": "sink" }, "status": { "name": "local-file-sink", "connector": { "state": "RUNNING", "worker_id": "10.10.10.10:8083" }, "tasks": [ { "id": 0, "state": "FAILED", "worker_id": "10.10.10.10:8083", "trace": "org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [${file:/opt/kafka/provider.properties:topics}]" } ], "type": "sink" } } } {code} Config has been updated, but new task has not been created. And as result connector doesn't work. It failed on: {code:java} [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44] [Worker clientId=connect-1, groupId=streaming-service_streaming_service] Failed to reconfigure connector's tasks (local-file-sink), retrying after backoff. org.apache.kafka.common.config.ConfigException: Could not read properties from file /opt/kafka/provider.properties at org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98) at org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58) at org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181) at org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371) at
[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks
[ https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Ivanov updated KAFKA-16837: -- Description: Hello, We faced an issue when is not possible to update Connector config if the *previous* task contains ConfigProvider's value with incorrect value that leads to ConfigException. I can provide simple Test Case to reproduce it with FileConfigProvider, but actually any ConfigProvider is acceptable that could raise exception if something wrong with config (like resource doesn't exist). *Prerequisites:* Kafka Connect instance with config providers: {code:java} config.providers=file config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code} 1. Create Kafka topic "test" 2. On the KK instance create the file "/opt/kafka/provider.properties" with content {code:java} topics=test {code} 3. Create simple FileSink connector: {code:java} PUT /connectors/local-file-sink/config { "connector.class": "FileStreamSink", "tasks.max": "1", "file": "/opt/kafka/test.sink.txt", "topics": "${file:/opt/kafka/provider.properties:topics}" } {code} 4. Checks that everything works fine: {code:java} GET /connectors?expand=info=status ... "status": { "name": "local-file-sink", "connector": { "state": "RUNNING", "worker_id": "10.10.10.10:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.10.10.10:8083" } ], "type": "sink" } } } {code} Looks fine. 5. Renames the file to "/opt/kafka/provider2.properties". 6. Update connector with new correct file name: {code:java} PUT /connectors/local-file-sink/config { "connector.class": "FileStreamSink", "tasks.max": "1", "file": "/opt/kafka/test.sink.txt", "topics": "${file:/opt/kafka/provider2.properties:topics}" } {code} Update {*}succeed{*}, got 200. 7. Checks that everything works fine: {code:java} { "local-file-sink": { "info": { "name": "local-file-sink", "config": { "connector.class": "FileStreamSink", "file": "/opt/kafka/test.sink.txt", "tasks.max": "1", "topics": "${file:/opt/kafka/provider2.properties:topics}", "name": "local-file-sink" }, "tasks": [ { "connector": "local-file-sink", "task": 0 } ], "type": "sink" }, "status": { "name": "local-file-sink", "connector": { "state": "RUNNING", "worker_id": "10.10.10.10:8083" }, "tasks": [ { "id": 0, "state": "FAILED", "worker_id": "10.10.10.10:8083", "trace": "org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [${file:/opt/kafka/provider.properties:topics}]" } ], "type": "sink" } } } {code} Config has been updated, but new task has not been created. And as result connector doesn't work. It failed on: {code:java} [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44] [Worker clientId=connect-1, groupId=streaming-service_streaming_service] Failed to reconfigure connector's tasks (local-file-sink), retrying after backoff. org.apache.kafka.common.config.ConfigException: Could not read properties from file /opt/kafka/provider.properties at org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98) at org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) at org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58) at org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181) at org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at
[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks
[ https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sergey Ivanov updated KAFKA-16837: -- Attachment: kafka_connect_config.png > Kafka Connect fails on update connector for incorrect previous Config > Provider tasks > > > Key: KAFKA-16837 > URL: https://issues.apache.org/jira/browse/KAFKA-16837 > Project: Kafka > Issue Type: Bug > Components: connect >Affects Versions: 3.5.1, 3.6.1, 3.8.0 >Reporter: Sergey Ivanov >Priority: Major > Attachments: kafka_connect_config.png > > > Hello, > We faced an issue when is not possible to update Connector config if the > *previous* task contains ConfigProvider's value with incorrect value that > leads to ConfigException. > I can provide simple Test Case to reproduce it with FileConfigProvider, but > actually any ConfigProvider is acceptable that could raise exception if > something wrong with config (like resource doesn't exist). > *Prerequisites:* > Kafka Connect instance with config providers: > > {code:java} > config.providers=file > config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code} > > 1. Create Kafka topic "test" > 2. On the KK instance create the file "/opt/kafka/provider.properties" with > content > {code:java} > topics=test > {code} > 3. Create simple FileSink connector: > {code:java} > PUT /connectors/local-file-sink/config > { > "connector.class": "FileStreamSink", > "tasks.max": "1", > "file": "/opt/kafka/test.sink.txt", > "topics": "${file:/opt/kafka/provider.properties:topics}" > } > {code} > 4. Checks that everything works fine: > {code:java} > GET /connectors?expand=info=status > ... > "status": { > "name": "local-file-sink", > "connector": { > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > }, > "tasks": [ > { > "id": 0, > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > } > ], > "type": "sink" > } > } > } > {code} > Looks fine. > 5. Renames the file to "/opt/kafka/provider2.properties". > 6. Update connector with new correct file name: > {code:java} > PUT /connectors/local-file-sink/config > { > "connector.class": "FileStreamSink", > "tasks.max": "1", > "file": "/opt/kafka/test.sink.txt", > "topics": "${file:/opt/kafka/provider2.properties:topics}" > } > {code} > Update {*}succeed{*}, got 200. > 7. Checks that everything works fine: > {code:java} > { > "local-file-sink": { > "info": { > "name": "local-file-sink", > "config": { > "connector.class": "FileStreamSink", > "file": "/opt/kafka/test.sink.txt", > "tasks.max": "1", > "topics": "${file:/opt/kafka/provider2.properties:topics}", > "name": "local-file-sink" > }, > "tasks": [ > { > "connector": "local-file-sink", > "task": 0 > } > ], > "type": "sink" > }, > "status": { > "name": "local-file-sink", > "connector": { > "state": "RUNNING", > "worker_id": "10.10.10.10:8083" > }, > "tasks": [ > { > "id": 0, > "state": "FAILED", > "worker_id": "10.10.10.10:8083", > "trace": "org.apache.kafka.common.errors.InvalidTopicException: > Invalid topics: [${file:/opt/kafka/provider.properties:topics}]" > } > ], > "type": "sink" > } > } > } > {code} > Config has been updated, but new task has not been created. And as result > connector doesn't work. > It failed on: > {code:java} > [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= > ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44] > [Worker clientId=connect-1, groupId=streaming-service_streaming_service] > Failed to reconfigure connector's tasks (local-file-sink), retrying after > backoff. > org.apache.kafka.common.config.ConfigException: Could not read properties > from file /opt/kafka/provider.properties > at > org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98) > at > org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103) > at > org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58) > at > org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181) > at > org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804) > at >