[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-06-10 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16837:
--
Fix Version/s: (was: 3.9.0)

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.7.1
>
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> 

[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-06-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16837:
--
Fix Version/s: 3.7.1

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.7.1, 3.9.0
>
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> 

[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-06-04 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-16837:
--
Fix Version/s: 3.8.0

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 3.8.0, 3.9.0
>
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the Kafka Connect instance create the file 
> "/opt/kafka/provider.properties" with content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
> 

[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-05-24 Thread Sergey Ivanov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Ivanov updated KAFKA-16837:
--
Description: 
Hello,

We faced an issue when is not possible to update Connector config if the 
*previous* task contains ConfigProvider's value with incorrect value that leads 
to ConfigException.

I can provide simple Test Case to reproduce it with FileConfigProvider, but 
actually any ConfigProvider is acceptable that could raise exception if 
something wrong with config (like resource doesn't exist).

*Prerequisites:*

Kafka Connect instance with config providers:

 
{code:java}
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
 

1. Create Kafka topic "test"
2. On the Kafka Connect instance create the file 
"/opt/kafka/provider.properties" with content
{code:java}
topics=test
{code}
3. Create simple FileSink connector:
{code:java}
PUT /connectors/local-file-sink/config
{
  "connector.class": "FileStreamSink",
  "tasks.max": "1",
  "file": "/opt/kafka/test.sink.txt",
  "topics": "${file:/opt/kafka/provider.properties:topics}"
}
{code}
4. Checks that everything works fine:
{code:java}
GET /connectors?expand=info=status
...
"status": {
  "name": "local-file-sink",
  "connector": {
"state": "RUNNING",
"worker_id": "10.10.10.10:8083"
  },
  "tasks": [
{
  "id": 0,
  "state": "RUNNING",
  "worker_id": "10.10.10.10:8083"
}
  ],
  "type": "sink"
}
  }
}
{code}
Looks fine.

5. Renames the file to "/opt/kafka/provider2.properties".
6. Update connector with new correct file name:
{code:java}
PUT /connectors/local-file-sink/config
{
  "connector.class": "FileStreamSink",
  "tasks.max": "1",
  "file": "/opt/kafka/test.sink.txt",
  "topics": "${file:/opt/kafka/provider2.properties:topics}"
}
{code}
Update {*}succeed{*}, got 200. 
7. Checks that everything works fine:
{code:java}
{
  "local-file-sink": {
"info": {
  "name": "local-file-sink",
  "config": {
"connector.class": "FileStreamSink",
"file": "/opt/kafka/test.sink.txt",
"tasks.max": "1",
"topics": "${file:/opt/kafka/provider2.properties:topics}",
"name": "local-file-sink"
  },
  "tasks": [
{
  "connector": "local-file-sink",
  "task": 0
}
  ],
  "type": "sink"
},
"status": {
  "name": "local-file-sink",
  "connector": {
"state": "RUNNING",
"worker_id": "10.10.10.10:8083"
  },
  "tasks": [
{
  "id": 0,
  "state": "FAILED",
  "worker_id": "10.10.10.10:8083",
  "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
}
  ],
  "type": "sink"
}
  }
}
{code}
Config has been updated, but new task has not been created. And as result 
connector doesn't work.

It failed on:
{code:java}
[2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
 [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
Failed to reconfigure connector's tasks (local-file-sink), retrying after 
backoff.
org.apache.kafka.common.config.ConfigException: Could not read properties from 
file /opt/kafka/provider.properties
 at 
org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
 at 
org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
 at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
 at 
org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
 at 
org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
 at 

[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-05-24 Thread Sergey Ivanov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Ivanov updated KAFKA-16837:
--
Description: 
Hello,

We faced an issue when is not possible to update Connector config if the 
*previous* task contains ConfigProvider's value with incorrect value that leads 
to ConfigException.

I can provide simple Test Case to reproduce it with FileConfigProvider, but 
actually any ConfigProvider is acceptable that could raise exception if 
something wrong with config (like resource doesn't exist).

*Prerequisites:*

Kafka Connect instance with config providers:

 
{code:java}
config.providers=file
config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
 

1. Create Kafka topic "test"
2. On the KK instance create the file "/opt/kafka/provider.properties" with 
content
{code:java}
topics=test
{code}
3. Create simple FileSink connector:
{code:java}
PUT /connectors/local-file-sink/config
{
  "connector.class": "FileStreamSink",
  "tasks.max": "1",
  "file": "/opt/kafka/test.sink.txt",
  "topics": "${file:/opt/kafka/provider.properties:topics}"
}
{code}
4. Checks that everything works fine:
{code:java}
GET /connectors?expand=info=status
...
"status": {
  "name": "local-file-sink",
  "connector": {
"state": "RUNNING",
"worker_id": "10.10.10.10:8083"
  },
  "tasks": [
{
  "id": 0,
  "state": "RUNNING",
  "worker_id": "10.10.10.10:8083"
}
  ],
  "type": "sink"
}
  }
}
{code}
Looks fine.

5. Renames the file to "/opt/kafka/provider2.properties".
6. Update connector with new correct file name:
{code:java}
PUT /connectors/local-file-sink/config
{
  "connector.class": "FileStreamSink",
  "tasks.max": "1",
  "file": "/opt/kafka/test.sink.txt",
  "topics": "${file:/opt/kafka/provider2.properties:topics}"
}
{code}
Update {*}succeed{*}, got 200. 
7. Checks that everything works fine:
{code:java}
{
  "local-file-sink": {
"info": {
  "name": "local-file-sink",
  "config": {
"connector.class": "FileStreamSink",
"file": "/opt/kafka/test.sink.txt",
"tasks.max": "1",
"topics": "${file:/opt/kafka/provider2.properties:topics}",
"name": "local-file-sink"
  },
  "tasks": [
{
  "connector": "local-file-sink",
  "task": 0
}
  ],
  "type": "sink"
},
"status": {
  "name": "local-file-sink",
  "connector": {
"state": "RUNNING",
"worker_id": "10.10.10.10:8083"
  },
  "tasks": [
{
  "id": 0,
  "state": "FAILED",
  "worker_id": "10.10.10.10:8083",
  "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
}
  ],
  "type": "sink"
}
  }
}
{code}
Config has been updated, but new task has not been created. And as result 
connector doesn't work.

It failed on:
{code:java}
[2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
 [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
Failed to reconfigure connector's tasks (local-file-sink), retrying after 
backoff.
org.apache.kafka.common.config.ConfigException: Could not read properties from 
file /opt/kafka/provider.properties
 at 
org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
 at 
org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
 at 
org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
 at 
org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
 at 
org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.publishConnectorTaskConfigs(DistributedHerder.java:2089)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:2082)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithExponentialBackoffRetries(DistributedHerder.java:2025)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$null$42(DistributedHerder.java:2038)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.runRequest(DistributedHerder.java:2232)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:470)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:371)
 at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
 at 

[jira] [Updated] (KAFKA-16837) Kafka Connect fails on update connector for incorrect previous Config Provider tasks

2024-05-24 Thread Sergey Ivanov (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-16837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sergey Ivanov updated KAFKA-16837:
--
Attachment: kafka_connect_config.png

> Kafka Connect fails on update connector for incorrect previous Config 
> Provider tasks
> 
>
> Key: KAFKA-16837
> URL: https://issues.apache.org/jira/browse/KAFKA-16837
> Project: Kafka
>  Issue Type: Bug
>  Components: connect
>Affects Versions: 3.5.1, 3.6.1, 3.8.0
>Reporter: Sergey Ivanov
>Priority: Major
> Attachments: kafka_connect_config.png
>
>
> Hello,
> We faced an issue when is not possible to update Connector config if the 
> *previous* task contains ConfigProvider's value with incorrect value that 
> leads to ConfigException.
> I can provide simple Test Case to reproduce it with FileConfigProvider, but 
> actually any ConfigProvider is acceptable that could raise exception if 
> something wrong with config (like resource doesn't exist).
> *Prerequisites:*
> Kafka Connect instance with config providers:
>  
> {code:java}
> config.providers=file
> config.providers.file.class=org.apache.kafka.common.config.provider.FileConfigProvider{code}
>  
> 1. Create Kafka topic "test"
> 2. On the KK instance create the file "/opt/kafka/provider.properties" with 
> content
> {code:java}
> topics=test
> {code}
> 3. Create simple FileSink connector:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider.properties:topics}"
> }
> {code}
> 4. Checks that everything works fine:
> {code:java}
> GET /connectors?expand=info=status
> ...
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "RUNNING",
>   "worker_id": "10.10.10.10:8083"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Looks fine.
> 5. Renames the file to "/opt/kafka/provider2.properties".
> 6. Update connector with new correct file name:
> {code:java}
> PUT /connectors/local-file-sink/config
> {
>   "connector.class": "FileStreamSink",
>   "tasks.max": "1",
>   "file": "/opt/kafka/test.sink.txt",
>   "topics": "${file:/opt/kafka/provider2.properties:topics}"
> }
> {code}
> Update {*}succeed{*}, got 200. 
> 7. Checks that everything works fine:
> {code:java}
> {
>   "local-file-sink": {
> "info": {
>   "name": "local-file-sink",
>   "config": {
> "connector.class": "FileStreamSink",
> "file": "/opt/kafka/test.sink.txt",
> "tasks.max": "1",
> "topics": "${file:/opt/kafka/provider2.properties:topics}",
> "name": "local-file-sink"
>   },
>   "tasks": [
> {
>   "connector": "local-file-sink",
>   "task": 0
> }
>   ],
>   "type": "sink"
> },
> "status": {
>   "name": "local-file-sink",
>   "connector": {
> "state": "RUNNING",
> "worker_id": "10.10.10.10:8083"
>   },
>   "tasks": [
> {
>   "id": 0,
>   "state": "FAILED",
>   "worker_id": "10.10.10.10:8083",
>   "trace": "org.apache.kafka.common.errors.InvalidTopicException: 
> Invalid topics: [${file:/opt/kafka/provider.properties:topics}]"
> }
>   ],
>   "type": "sink"
> }
>   }
> }
> {code}
> Config has been updated, but new task has not been created. And as result 
> connector doesn't work.
> It failed on:
> {code:java}
> [2024-05-24T12:08:24.362][ERROR][request_id= ][tenant_id= 
> ][thread=DistributedHerder-connect-1-1][class=org.apache.kafka.connect.runtime.distributed.DistributedHerder][method=lambda$reconfigureConnectorTasksWithExponentialBackoffRetries$44]
>  [Worker clientId=connect-1, groupId=streaming-service_streaming_service] 
> Failed to reconfigure connector's tasks (local-file-sink), retrying after 
> backoff.
> org.apache.kafka.common.config.ConfigException: Could not read properties 
> from file /opt/kafka/provider.properties
>  at 
> org.apache.kafka.common.config.provider.FileConfigProvider.get(FileConfigProvider.java:98)
>  at 
> org.apache.kafka.common.config.ConfigTransformer.transform(ConfigTransformer.java:103)
>  at 
> org.apache.kafka.connect.runtime.WorkerConfigTransformer.transform(WorkerConfigTransformer.java:58)
>  at 
> org.apache.kafka.connect.storage.ClusterConfigState.taskConfig(ClusterConfigState.java:181)
>  at 
> org.apache.kafka.connect.runtime.AbstractHerder.taskConfigsChanged(AbstractHerder.java:804)
>  at 
>