[ 
https://issues.apache.org/jira/browse/KAFKA-17719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chaitanya Mukka updated KAFKA-17719:
------------------------------------
    Description: 
The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) 
alters the logic for materializing a view of the config topic to ignore task 
configs when there is no configuration for that connector present earlier in 
the config topic. However, the logic fails to consider topics that might get 
compacted over time.

In particular, when we have a connector {{C1}} running fine, the records in the 
config topic for the connector will look something like {{{}C1, T1, T2, 
Task-commit-record{}}}.

If the connector gets a config update that doesn't produce any new task configs 
(note that this is a valid case when there are no task config changes[1]) we 
only produce a Connector config record [2]. The config topic now looks like 
{{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the topic gets 
compacted we will end up with {{{}T1, T2, Task-commit-record, C1{}}}. This can 
be a common scenario in large and old connect clusters.

Based on the changes for KAFKA-16838, when the connect worker reads this config 
state it ignores the task configs [3] for this while the connector is still 
active and we might have active assignments for the same. The symptom of this 
issue is an NPE which shows up when trying to start the tasks: 
{noformat}
java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because 
"inputMap" is null
        at org.apache.kafka.common.utils.Utils.castToStringObjectMap
        at org.apache.kafka.common.config.AbstractConfig.<init>
        at org.apache.kafka.common.config.AbstractConfig.<init>
        at org.apache.kafka.connect.runtime.TaskConfig.<init>
        at org.apache.kafka.connect.runtime.Worker.startTask)
        at org.apache.kafka.connect.runtime.Worker.startSourceTask
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41
        at java.base/java.util.concurrent.FutureTask.run
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run
        at java.base/java.lang.Thread.run(Thread.java:1583){noformat}
 

[1] - 
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047]
  
[2] - 
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524]
[3] - 
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074]

  was:
The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) 
alters the logic for materializing a view of the config topic to ignore task 
configs when there is no configuration for that connector present earlier in 
the config topic. However, the logic fails to consider topics that might get 
compacted over time.

In particular, when we have a connector {{C1}} running fine, the records in the 
config topic for the connector will look something like {{{}C1, T1, T2, 
Task-commit-record{}}}.

If the connector gets a config update that doesn't produce any new task configs 
(note that this is a valid case when there are no task config changes[1]) we 
only produce a Connector config record [2]. The config topic now looks like 
{{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the topic gets 
compacted we will end up with {{{}T1, T2, Task-commit-record, C1{}}}. This can 
be a common scenario in large and old connect clusters.

Based on the changes for KAFKA-16838, when the connect worker reads this config 
state it ignores the task configs for this while the connector is still active 
and we might have active assignments for the same. The symptom of this issue is 
an NPE which shows up when trying to start the tasks: 
{noformat}
java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because 
"inputMap" is null
        at org.apache.kafka.common.utils.Utils.castToStringObjectMap
        at org.apache.kafka.common.config.AbstractConfig.<init>
        at org.apache.kafka.common.config.AbstractConfig.<init>
        at org.apache.kafka.connect.runtime.TaskConfig.<init>
        at org.apache.kafka.connect.runtime.Worker.startTask)
        at org.apache.kafka.connect.runtime.Worker.startSourceTask
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41
        at java.base/java.util.concurrent.FutureTask.run
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run
        at java.base/java.lang.Thread.run(Thread.java:1583){noformat}
 

[1] - 
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047]
  
[2] - 
[https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524]


> Connect may fail to start tasks when reading from a compacted config topic
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-17719
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17719
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect
>            Reporter: Chaitanya Mukka
>            Priority: Major
>
> The fix for KAFKA-16838 (via [https://github.com/apache/kafka/pull/16122]) 
> alters the logic for materializing a view of the config topic to ignore task 
> configs when there is no configuration for that connector present earlier in 
> the config topic. However, the logic fails to consider topics that might get 
> compacted over time.
> In particular, when we have a connector {{C1}} running fine, the records in 
> the config topic for the connector will look something like {{{}C1, T1, T2, 
> Task-commit-record{}}}.
> If the connector gets a config update that doesn't produce any new task 
> configs (note that this is a valid case when there are no task config 
> changes[1]) we only produce a Connector config record [2]. The config topic 
> now looks like {{{}C1, T1, T2, Task-commit-record, C1{}}}. However, if the 
> topic gets compacted we will end up with {{{}T1, T2, Task-commit-record, 
> C1{}}}. This can be a common scenario in large and old connect clusters.
> Based on the changes for KAFKA-16838, when the connect worker reads this 
> config state it ignores the task configs [3] for this while the connector is 
> still active and we might have active assignments for the same. The symptom 
> of this issue is an NPE which shows up when trying to start the tasks: 
> {noformat}
> java.lang.NullPointerException: Cannot invoke "java.util.Map.size()" because 
> "inputMap" is null
>       at org.apache.kafka.common.utils.Utils.castToStringObjectMap
>       at org.apache.kafka.common.config.AbstractConfig.<init>
>       at org.apache.kafka.common.config.AbstractConfig.<init>
>       at org.apache.kafka.connect.runtime.TaskConfig.<init>
>       at org.apache.kafka.connect.runtime.Worker.startTask)
>       at org.apache.kafka.connect.runtime.Worker.startSourceTask
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$41
>       at java.base/java.util.concurrent.FutureTask.run
>       at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker
>       at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run
>       at java.base/java.lang.Thread.run(Thread.java:1583){noformat}
>  
> [1] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java#L1047]
>   
> [2] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L524]
> [3] - 
> [https://github.com/apache/kafka/blob/trunk/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java#L1072-L1074]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to