TakaHiR07 opened a new issue, #24977: URL: https://github.com/apache/pulsar/issues/24977
### Search before reporting - [x] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Read release policy - [x] I understand that [unsupported versions](https://pulsar.apache.org/contribute/release-policy/#supported-versions) don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker. ### User environment pulsar-version: 3.0.x ### Issue Description When restart broker, notice that the subscription numbers of tenant-1/namespace-1/__change_events increase unexpectedly. The situation is almost the same as described in https://github.com/apache/pulsar/pull/24658. However, I don't find any log that show failed to close reader close. And after dump the broker heap, I make sure that the thousands of readers remained in SystemTopicBasedTopicPoliciesService has never been closed since it create successful. After diving into the code, I found the issue's root reason is a concurrent error, which is introduced by this pr https://github.com/apache/pulsar/pull/21231. And this pr introduce two problem. Problem 1: cleanCacheAndCloseReader() would be executed twice when throw exception In SystemTopicBasedTopicPoliciesService#prepareInitPoliciesCacheAsync, there are three steps:1.createReader, 2.initPolicesCache(read message from earliest), 3.readMorePoliciesAsync(remain a job to Read new policies in background). Once exception occur in each of the three steps, we should clean the error reader. However, when cleanCacheAndCloseReader() execute twice, would exist this concurrent error, in the following steps. ` Request-1: policyCacheInitMap put future1 Request-1: create reader1 Request-1: readerCaches put reader1 reader1 read error Request-1: first time cleanCacheAndCloseReader(), include: remove reader1 in readerCaches close reader1 remove future1 in policyCacheInitMap Request-2: policyCacheInitMap put future2 Request-1: second time cleanCacheAndCloseReader(), only remove future2 in policyCacheInitMap Request-2: create reader2 Request-2: readerCaches put reader2 Request-3: policyCacheInitMap put future3 Request-3: create reader3 Request-3: readerCaches put reader3 ` Now you can see reader2 become orphan and permanently can not be clean. Actually, each create process should be corresponding to one time cleanCacheAndCloseReader(). Problem 2: there is recursive update in ConcurrentHashMap. Because in policyCacheInitMap.computeIfAbsent(), we put in a future. when this future throw exception, it would cleanCacheAndCloseReader() and update policyCacheInitMap again. It is possible to update in a same thread. Actually, it is not appropriate to do complex operation in computeIfAbsent(), especially double update operation. I notice that https://github.com/apache/pulsar/pull/24658 is relevant to the problem 1, but it make the logic more complex, and it still exist the situation like request-3 modify the resource created in request-2. And it seems still not fix the case: if reader close occur exception in cleanCacheAndCloseReader, the reader still is orphan. https://github.com/apache/pulsar/pull/24939 is relevant to problem 2, but it just use different thread to execute cleanCacheAndCloseReader() trigger in one area. readMorePoliciesAsync() would also trigger cleanCacheAndCloseReader(), although it is not execute in the same thread. Whatever, I think it is not appropriate to keep the double update code in concurrentHashmap. ### Error messages ```text ``` ### Reproducing the issue restart broker. may occur ### Additional information _No response_ ### Are you willing to submit a PR? - [x] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
