f1amingo opened a new issue, #10375:
URL: https://github.com/apache/rocketmq/issues/10375

   ### Before Creating the Bug Report
   
   - [x] I found a bug, not just asking a question, which should be created in 
[GitHub Discussions](https://github.com/apache/rocketmq/discussions).
   
   - [x] I have searched the [GitHub 
Issues](https://github.com/apache/rocketmq/issues) and [GitHub 
Discussions](https://github.com/apache/rocketmq/discussions)  of this 
repository and believe that this is not a duplicate.
   
   - [x] I have confirmed that this bug belongs to the current repository, not 
other repositories of RocketMQ.
   
   
   ### Runtime platform environment
   
   Linux/macOS (All platforms, this is a code-level concurrency bug)
   
   ### RocketMQ version
   
   5.5.0 (develop branch, but the bug exists in all versions with 
`ConsumeQueueStore`)
   
   ### JDK Version
   
   All (not JDK-specific)
   
   ### Describe the Bug
   
   There is a race condition between `AbstractConsumeQueueStore#deleteTopic` 
and `ConsumeQueueStore.FlushConsumeQueueService#doFlush`. When a topic is being 
deleted, the flush thread may inadvertently recreate the deleted topic's 
ConsumeQueue entries in `consumeQueueTable` via `findOrCreateConsumeQueue`.
   
   The root cause is that `flush(ConsumeQueueInterface, int)` calls 
`getLifeCycle(topic, queueId)` which delegates to 
`findOrCreateConsumeQueue(topic, queueId)`. If the topic has been removed from 
`consumeQueueTable` between the time the flush thread obtained the 
`ConsumeQueueInterface` reference (through iteration) and the time it calls 
`getLifeCycle`, `findOrCreateConsumeQueue` will recreate the topic entry.
   
   ### Steps to Reproduce
   
   1. Thread A: Call `deleteTopic(topic)` which iterates and destroys all 
queues, then calls `consumeQueueTable.remove(topic)`
   2. Thread B (`FlushConsumeQueueService#doFlush`): Iterates 
`consumeQueueTable.values()` and obtains a reference to a 
`ConsumeQueueInterface` belonging to the topic being deleted
   3. Thread A completes `consumeQueueTable.remove(topic)`
   4. Thread B: Calls `flush(cq, flushConsumeQueueLeastPages)` → 
`getLifeCycle(cq.getTopic(), cq.getQueueId())` → 
`findOrCreateConsumeQueue(topic, queueId)`
   5. `findOrCreateConsumeQueue` sees the topic is absent from 
`consumeQueueTable` and **recreates** a new ConsumeQueue, putting it back into 
the map
   
   ### What Did You Expect to See?
   
   After `deleteTopic` completes, the topic should no longer exist in 
`consumeQueueTable`, and the flush thread should not recreate it.
   
   ### What Did You See Instead?
   
   The deleted topic gets re-inserted into `consumeQueueTable` by the flush 
thread through `findOrCreateConsumeQueue`, causing a "zombie" ConsumeQueue that 
was supposed to be deleted.
   
   ### Additional Context
   
   Relevant code path:
   
   ```java
   // ConsumeQueueStore.java line 335-338
   public boolean flush(ConsumeQueueInterface consumeQueue, int 
flushLeastPages) {
       FileQueueLifeCycle fileQueueLifeCycle = 
getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId());
       return fileQueueLifeCycle.flush(flushLeastPages);
   }
   
   // ConsumeQueueStore.java line 208-210
   private FileQueueLifeCycle getLifeCycle(String topic, int queueId) {
       return findOrCreateConsumeQueue(topic, queueId);
   }
   ```
   
   A possible fix would be to have `flush(ConsumeQueueInterface, int)` directly 
cast and flush the passed-in `consumeQueue` object instead of re-looking it up 
via `findOrCreateConsumeQueue`, or add a deletion-aware check before calling 
`findOrCreateConsumeQueue`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to