Aljoscha Pörtner created KAFKA-13611:
----------------------------------------

             Summary: Failed reconfiguration of tasks can cause missing offset 
replications in MirrorCheckpointConnector
                 Key: KAFKA-13611
                 URL: https://issues.apache.org/jira/browse/KAFKA-13611
             Project: Kafka
          Issue Type: Improvement
          Components: mirrormaker
    Affects Versions: 3.0.0, 2.8.1, 2.7.2, 2.6.3, 3.1.0, 2.6.2, 2.7.1, 2.8.0, 
2.6.1, 2.7.0, 2.5.1, 2.6.0, 2.4.1, 2.5.0, 2.4.0
            Reporter: Aljoscha Pörtner


Because the _knownConsumerGroups_ are stored within a variable an not queried 
every time _refreshConsumerGroups_ gets executed, errors within the task 
reconfiguration aren't recognized and the reconfiguration will not be retried 
until a new consumer group is added. This can lead to missing offset updates in 
the target cluster because the consumer group is not picked up by a task until 
a completely new consumer group is added and the task reconfiguration is 
successful.

 
{code:java}
    private void refreshConsumerGroups()            throws 
InterruptedException, ExecutionException {        List<String> consumerGroups = 
findConsumerGroups();        Set<String> newConsumerGroups = new HashSet<>();   
     newConsumerGroups.addAll(consumerGroups);        
newConsumerGroups.removeAll(knownConsumerGroups);        Set<String> 
deadConsumerGroups = new HashSet<>();        
deadConsumerGroups.addAll(knownConsumerGroups);        
deadConsumerGroups.removeAll(consumerGroups);        if 
(!newConsumerGroups.isEmpty() || !deadConsumerGroups.isEmpty()) {            
log.info("Found {} consumer groups for {}. {} are new. {} were removed. 
Previously had {}.",                    consumerGroups.size(), sourceAndTarget, 
newConsumerGroups.size(), deadConsumerGroups.size(),                    
knownConsumerGroups.size());            log.debug("Found new consumer groups: 
{}", newConsumerGroups);            knownConsumerGroups = consumerGroups;       
     context.requestTaskReconfiguration();        }    } {code}
[Code|https://github.com/apache/kafka/blob/ca37f14076adbaa302a558a750be197c202f1038/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointConnector.java#L124]

For an example how the problem can be triggered take a look at the following 
[issue|https://github.com/strimzi/strimzi-kafka-operator/issues/3688].



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to