[jira] [Commented] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-05-29 Thread Sergey Ivanov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850359#comment-17850359
 ] 

Sergey Ivanov commented on KAFKA-16837:
---

Thanks for the answer, [~ChrisEgerton]!

I don't have enough Kafka source code experience to come right decision about 
that part, glad to hear you think the same about comparing original configs. 

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> 

[jira] [Commented] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-05-28 Thread Chris Egerton (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17850059#comment-17850059
 ] 

Chris Egerton commented on KAFKA-16837:
---

Thanks for raising this [~mrMigles], great find!

I don't think we need to compare config provider-resolved values for task 
configs, since we end up performing that resolution at the same time for both 
sets of configs and that's likely to generate the same values. It should be 
fine to compare original configs instead.

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Priority: Major
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
>