f1amingo opened a new issue, #10375: URL: https://github.com/apache/rocketmq/issues/10375
### Before Creating the Bug Report - [x] I found a bug, not just asking a question, which should be created in [GitHub Discussions](https://github.com/apache/rocketmq/discussions). - [x] I have searched the [GitHub Issues](https://github.com/apache/rocketmq/issues) and [GitHub Discussions](https://github.com/apache/rocketmq/discussions) of this repository and believe that this is not a duplicate. - [x] I have confirmed that this bug belongs to the current repository, not other repositories of RocketMQ. ### Runtime platform environment Linux/macOS (All platforms, this is a code-level concurrency bug) ### RocketMQ version 5.5.0 (develop branch, but the bug exists in all versions with `ConsumeQueueStore`) ### JDK Version All (not JDK-specific) ### Describe the Bug There is a race condition between `AbstractConsumeQueueStore#deleteTopic` and `ConsumeQueueStore.FlushConsumeQueueService#doFlush`. When a topic is being deleted, the flush thread may inadvertently recreate the deleted topic's ConsumeQueue entries in `consumeQueueTable` via `findOrCreateConsumeQueue`. The root cause is that `flush(ConsumeQueueInterface, int)` calls `getLifeCycle(topic, queueId)` which delegates to `findOrCreateConsumeQueue(topic, queueId)`. If the topic has been removed from `consumeQueueTable` between the time the flush thread obtained the `ConsumeQueueInterface` reference (through iteration) and the time it calls `getLifeCycle`, `findOrCreateConsumeQueue` will recreate the topic entry. ### Steps to Reproduce 1. Thread A: Call `deleteTopic(topic)` which iterates and destroys all queues, then calls `consumeQueueTable.remove(topic)` 2. Thread B (`FlushConsumeQueueService#doFlush`): Iterates `consumeQueueTable.values()` and obtains a reference to a `ConsumeQueueInterface` belonging to the topic being deleted 3. Thread A completes `consumeQueueTable.remove(topic)` 4. Thread B: Calls `flush(cq, flushConsumeQueueLeastPages)` → `getLifeCycle(cq.getTopic(), cq.getQueueId())` → `findOrCreateConsumeQueue(topic, queueId)` 5. `findOrCreateConsumeQueue` sees the topic is absent from `consumeQueueTable` and **recreates** a new ConsumeQueue, putting it back into the map ### What Did You Expect to See? After `deleteTopic` completes, the topic should no longer exist in `consumeQueueTable`, and the flush thread should not recreate it. ### What Did You See Instead? The deleted topic gets re-inserted into `consumeQueueTable` by the flush thread through `findOrCreateConsumeQueue`, causing a "zombie" ConsumeQueue that was supposed to be deleted. ### Additional Context Relevant code path: ```java // ConsumeQueueStore.java line 335-338 public boolean flush(ConsumeQueueInterface consumeQueue, int flushLeastPages) { FileQueueLifeCycle fileQueueLifeCycle = getLifeCycle(consumeQueue.getTopic(), consumeQueue.getQueueId()); return fileQueueLifeCycle.flush(flushLeastPages); } // ConsumeQueueStore.java line 208-210 private FileQueueLifeCycle getLifeCycle(String topic, int queueId) { return findOrCreateConsumeQueue(topic, queueId); } ``` A possible fix would be to have `flush(ConsumeQueueInterface, int)` directly cast and flush the passed-in `consumeQueue` object instead of re-looking it up via `findOrCreateConsumeQueue`, or add a deletion-aware check before calling `findOrCreateConsumeQueue`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
