BewareMyPower opened a new pull request #12586: URL: https://github.com/apache/pulsar/pull/12586
### Motivation https://github.com/apache/pulsar/pull/11600 adds the timeout for GetLastMessageId request by using `sendRequestWithId` instead of `sendCommand`. However, it's still incorrect. Because when the request timeout exceeds, the future that is completed with `ResultTimeout` is what `sendRequestWithId` returns but not the `promise.getFuture()`. Therefore, even if the request was not responded in `operationTimeout` seconds, the future returned by `newGetLastMessageId` would still be not completed. Besides, when I tried to complete the `promise` in `sendRequestWithId`'s callback, I found a deadlock issue if `ServerCnx#handleGetLastMessageId` hang forever (I just add a long `sleep` call in this method). ``` frame #3: std::__1::mutex::lock() + 9 frame #4: pulsar::Promise<pulsar::Result, pulsar::ResponseData>::setFailed(pulsar::Result) const [inlined] std::__1::unique_lock<std::__1::mutex>::unique_lock(this=<unavailable>, __m=0x00007ffb1b0044d8) at __mutex_base:119:61 [opt] frame #5: pulsar::Promise<pulsar::Result, pulsar::ResponseData>::setFailed(pulsar::Result) const [inlined] std::__1::unique_lock<std::__1::mutex>::unique_lock(this=<unavailable>, __m=0x00007ffb1b0044d8) at __mutex_base:119 [opt] frame #6: pulsar::Promise<pulsar::Result, pulsar::ResponseData>::setFailed(this=0x00007ffb1b0046e8, result=ResultConnectError) const at Future.h:118 [opt] frame #7: pulsar::ClientConnection::close(this=<unavailable>, result=ResultConnectError) at ClientConnection.cc:1556:27 [opt] ``` We can see `Promise::setFailed` stuck in `ClientConnection::close`: ```c++ for (auto& kv : pendingRequests) { kv.second.promise.setFailed(ResultConnectError); } ``` It's because the future's callback is called in `setFailed`. However, the callback also calls `setFailed`, which tries to acquire the same lock that is not reentrant. So deadlock happens. ### Modifications Refactor the `Future`/`Promise` infrastructure. The current design is too old and the code style is bad. The important things of the refactoring are: 1. Change `completed_` field (the original `complete` field) to an atomic variable. So when checking if the future is completed, no lock is required. 2. Move the triggering of `listeners_` out of the locked code block. So that each listener (callback) will be triggered without acquiring any lock. 3. Move `conditional_variable::notify_all()` out of the locked code block as well. The notifying thread does not need to hold the lock, see https://en.cppreference.com/w/cpp/thread/condition_variable/notify_all. 4. Move all related implementations into `InternalState` so that `Future` and `Promise` only need to call them directly. Then add a `PromiseTest` to protect the refactoring. Based on the refactor, just add a callback to `sendRequestWithId` in `newGetLastMessageId` to make sure the request timeout works. ### Verifying this change - [ ] Make sure that the change passes the CI checks. It's hard to simulate the operation timeout. So I have to add the following code to `ServerCnx#handleGetLastMessageId` ```java try { Thread.sleep(1000000); } catch (InterruptedException e) { e.printStackTrace(); } ``` and run a reader to call `hasMessageAvailable` with 3 seconds operation timeout. After it failed, the client exited after 6 seconds, which is twice the operation timeout. The logs are: ``` 2021-11-02 20:39:39.685 ERROR [0x11b16de00] SampleConsumer:48 | Failed to check hasMessageAvailable: TimeOut 2021-11-02 20:39:39.685 INFO [0x11b16de00] ClientImpl:492 | Closing Pulsar client with 0 producers and 1 consumers 2021-11-02 20:39:39.685 INFO [0x11b16de00] ConsumerImpl:894 | [persistent://public/default/my-topic-0, reader-35479ac30a, 0] Closing consumer for topic persistent://public/default/my-topic-0 2021-11-02 20:39:43.686 ERROR [0x700006b1e000] ConsumerImpl:952 | [persistent://public/default/my-topic-0, reader-35479ac30a, 0] Failed to close consumer: TimeOut 2021-11-02 20:39:43.686 INFO [0x700006b1e000] ClientConnection:1542 | [[::1]:60215 -> [::1]:6650] Connection closed 2021-11-02 20:39:43.687 INFO [0x11b16de00] ClientConnection:255 | [[::1]:60215 -> [::1]:6650] Destroyed connection ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
