Vanlightly commented on pull request #11691:
URL: https://github.com/apache/pulsar/pull/11691#issuecomment-901879130


   @315157973 @codelipenghui I have an implementation based on the suggestion, 
there was just one complication: `failingPendingReceive()`.
   
   On `connectionOpened`, `closeAsync`, `unsubscribeAsync` any pending receives 
are completed exceptionally. The ConsumerImpl submits the work to the 
`internalPinnedExecutor`, but the `MultiTopicsConsumerImpl` executes it 
synchronously. 
   
   The issues here:
   - all calls to `failingPendingReceive()` treat it as if it were synchronous 
code, even though it may be run asynchronously. I haven't been able to produce 
a problem, but theoretically, once `closeAsync` completes, there could still be 
pending receives that were not completed exceptionally yet, and `shutdown()` 
could throw them away, causing user code to block on batchReceive.  
   - if we choose to run it synchronously then this makes the polling of 
`pendingBatchReceives` multi-threaded again - therefore needing the lock.
   - if we choose to run it asynchronously, then we need to make it return a 
CompletableFuture and ensure that `connectionOpened`, `closeAsync`, 
`unsubscribeAsync` treat it correctly as an asynchronous operation. 
   
   I have an implementation where `failingPendingReceive()` returns a 
CompletableFuture and all calling code treats it as an asynchronous call. The 
one thing I don't like is if user code calls shutdown(), then `batchReceive` 
can end up blocking forever again as the batchReceive future can get discarded.
   
   I am in the process of running some nasty multi-threaded tests with 
concurrent calls to batchReceive and close, with one client, multiple shared 
consumers and multiple threads per consumer. Running for non-partitioned and 
partitioned topics. Will force push when I am happy there are no race 
conditions. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to