BewareMyPower opened a new pull request #12586:
URL: https://github.com/apache/pulsar/pull/12586


   ### Motivation
   
   https://github.com/apache/pulsar/pull/11600 adds the timeout for 
GetLastMessageId request by using `sendRequestWithId` instead of `sendCommand`. 
However, it's still incorrect. Because when the request timeout exceeds, the 
future that is completed with `ResultTimeout` is what `sendRequestWithId` 
returns but not the `promise.getFuture()`. Therefore, even if the request was 
not responded in `operationTimeout` seconds, the future returned by 
`newGetLastMessageId` would still be not completed.
   
   Besides, when I tried to complete the `promise` in `sendRequestWithId`'s 
callback, I found a deadlock issue if `ServerCnx#handleGetLastMessageId` hang 
forever (I just add a long `sleep` call in this method).
   
   ```
       frame #3: std::__1::mutex::lock() + 9
       frame #4: pulsar::Promise<pulsar::Result, 
pulsar::ResponseData>::setFailed(pulsar::Result) const [inlined] 
std::__1::unique_lock<std::__1::mutex>::unique_lock(this=<unavailable>, 
__m=0x00007ffb1b0044d8) at __mutex_base:119:61 [opt]
       frame #5: pulsar::Promise<pulsar::Result, 
pulsar::ResponseData>::setFailed(pulsar::Result) const [inlined] 
std::__1::unique_lock<std::__1::mutex>::unique_lock(this=<unavailable>, 
__m=0x00007ffb1b0044d8) at __mutex_base:119 [opt]
       frame #6: pulsar::Promise<pulsar::Result, 
pulsar::ResponseData>::setFailed(this=0x00007ffb1b0046e8, 
result=ResultConnectError) const at Future.h:118 [opt]
       frame #7: pulsar::ClientConnection::close(this=<unavailable>, 
result=ResultConnectError) at ClientConnection.cc:1556:27 [opt]
   ```
   
   We can see `Promise::setFailed` stuck in `ClientConnection::close`:
   
   ```c++
        for (auto& kv : pendingRequests) {
            kv.second.promise.setFailed(ResultConnectError);
        }  
   ```
   
   It's because the future's callback is called in `setFailed`. However, the 
callback also calls `setFailed`, which tries to acquire the same lock that is 
not reentrant. So deadlock happens.
   
   ### Modifications
   
   Refactor the `Future`/`Promise` infrastructure. The current design is too 
old and the code style is bad. The important things of the refactoring are:
   1. Change `completed_` field (the original `complete` field) to an atomic 
variable. So when checking if the future is completed, no lock is required.
   2. Move the triggering of `listeners_` out of the locked code block. So that 
each listener (callback) will be triggered without acquiring any lock.
   3. Move `conditional_variable::notify_all()` out of the locked code block as 
well. The notifying thread does not need to hold the lock, see 
https://en.cppreference.com/w/cpp/thread/condition_variable/notify_all.
   4. Move all related implementations into `InternalState` so that `Future` 
and `Promise` only need to call them directly.
   
   Then add a `PromiseTest` to protect the refactoring. Based on the refactor, 
just add a callback to `sendRequestWithId` in `newGetLastMessageId` to make 
sure the request timeout works.
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   It's hard to simulate the operation timeout. So I have to add the following 
code to `ServerCnx#handleGetLastMessageId`
   
   ```java
           try {
               Thread.sleep(1000000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
   ```
   
   and run a reader to call `hasMessageAvailable` with 3 seconds operation 
timeout. After it failed, the client exited after 6 seconds, which is twice the 
operation timeout. The logs are:
   
   ```
   2021-11-02 20:39:39.685 ERROR [0x11b16de00] SampleConsumer:48 | Failed to 
check hasMessageAvailable: TimeOut
   2021-11-02 20:39:39.685 INFO  [0x11b16de00] ClientImpl:492 | Closing Pulsar 
client with 0 producers and 1 consumers
   2021-11-02 20:39:39.685 INFO  [0x11b16de00] ConsumerImpl:894 | 
[persistent://public/default/my-topic-0, reader-35479ac30a, 0] Closing consumer 
for topic persistent://public/default/my-topic-0
   2021-11-02 20:39:43.686 ERROR [0x700006b1e000] ConsumerImpl:952 | 
[persistent://public/default/my-topic-0, reader-35479ac30a, 0] Failed to close 
consumer: TimeOut
   2021-11-02 20:39:43.686 INFO  [0x700006b1e000] ClientConnection:1542 | 
[[::1]:60215 -> [::1]:6650] Connection closed
   2021-11-02 20:39:43.687 INFO  [0x11b16de00] ClientConnection:255 | 
[[::1]:60215 -> [::1]:6650] Destroyed connection
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to