git-enzo opened a new issue #7556:
URL: https://github.com/apache/pulsar/issues/7556


   **Describe the bug**
   In MultiTopicsConsumerImpl class there is possibility to break an internal 
state during subscribe topics. This behavior may occur in 
```doSubscribeTopicPartitions()``` method, which is invoking during 
subscribing, for example in ```subscribeAsync()``` method.
   
   The MultiTopicsConsumerImpl class stores state about number of partitions 
for each subscribed topic. This state is keep in the following map: 
```protected final ConcurrentHashMap<String, Integer> topics```. There is also 
another variable which keeps information about topic partitions 
(```AtomicInteger allTopicPartitionsNumber```), which stores sum of all topic 
partitions from all subscribed topics. There is also a map of consumers for 
particular topics.
   
   If we are trying to subscribe a new topic, then it is added to topics' map, 
and then ```allTopicPartitionsNumber``` counter is incrementing:
   ```
   this.topics.putIfAbsent(topicName, numPartitions);
   allTopicPartitionsNumber.addAndGet(numPartitions);
   ``` 
   or
   ```
   this.topics.putIfAbsent(topicName, 1);
   allTopicPartitionsNumber.incrementAndGet();
   ```
   
   The new consumer will be also added to consumer's map.
   
   The problem may occur, when two _same topics_ (topic names) will be 
subscribed at _the same time_. In this case in one of the subscribe operation, 
the topic will be not added to topics' map, because of ```putIfAbsent()``` 
method, but counter will be always incremented. If we assume that two separate 
subscription operations are running at the same time, for example due to 
asynchronous, then both of them may failed, because state will be incorrect:
   ```
   int numTopics = 
this.topics.values().stream().mapToInt(Integer::intValue).sum();
   checkState(allTopicPartitionsNumber.get() == numTopics,
                       "allTopicPartitionsNumber " + 
allTopicPartitionsNumber.get()
                           + " not equals expected: " + numTopics);
   ```
   
   This behavior cause closing internal consumer, which will be also removed 
from consumer's map and topic will be not subscribed.
   
   I think this is an unexpected behavior, because if we run such two 
operations sequentially, then everything would be fine - topic will be 
subscribed. To sum up, as ```doSubscribeTopicPartitions()``` method is invoked 
from ```subscribeAsync()``` method, which should be asynchronous, then such 
behavior is unexpected.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to