massakam opened a new pull request #6966:
URL: https://github.com/apache/pulsar/pull/6966


   ### Motivation
   
   If a large number of consumers are repeatedly added or removed from a Shared 
type subscription, message delivery to consumers may stop occasionally. Message 
delivery will not resume until we unload the topic or restart the broker.
   
   When I set the log level of the broker to DEBUG, the following log was 
output while the message delivery was stopped.
   
   > 17:17:16.425 [pulsar-io-20-6] DEBUG 
o.a.p.b.s.p.PersistentDispatcherMultipleConsumers - 
[persistent://massakam/global/test/pt16-partition-15 / sub1] Cannot schedule 
next read until previous one is done
   
   This means that the variable `havePendingRead` in 
`PersistentDispatcherMultipleConsumers` remains true and the reading of new 
entries has been canceled.
   
https://github.com/apache/pulsar/blob/v2.3.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L312-L321
   
   However, as far as I can see the stats-internal of that topic, there are no 
actual pending read operations. Therefore, it seems that the value of 
`havePendingRead` is wrong.
   ```json
   "cursors" : {
     "sub1" : {
       "markDeletePosition" : "2383570:2158",
       "readPosition" : "2383570:2159",
       "waitingReadOp" : false,
       "pendingReadOps" : 0,
       "messagesConsumedCounter" : 309909,
       "cursorLedger" : 2383013,
       "cursorLedgerLastEntry" : 775,
       "individuallyDeletedMessages" : "[]",
       "lastLedgerSwitchTimestamp" : "2020-05-13T13:10:35.721+09:00",
       "state" : "Open",
       "numberOfEntriesSinceFirstNotAckedMessage" : 1,
       "totalNonContiguousDeletedMessagesRange" : 0,
       "properties" : { }
     }
   }
   ```
   
   As a result of the investigation, I found that `IllegalArgumentException` 
was thrown on the following line:
   
https://github.com/apache/pulsar/blob/v2.3.2/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L318
   
   ```
   java.lang.IllegalArgumentException: null
           at 
com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
           at 
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReadEntriesOrWait(ManagedCursorImpl.java:561)
           at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:325)
           at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readMoreEntries$2(PersistentDispatcherMultipleConsumers.java:255)
           at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
           at 
io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)
           at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164)
           at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)
           at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:384)
           at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
           at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
           at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
           at java.lang.Thread.run(Thread.java:748)
   ```
   
   This means that the value of `messagesToRead` is less than or equal to 0. A 
value of 0 or less may be assigned to `messagesToRead` in the following part:
   
https://github.com/apache/pulsar/blob/1fd1b2b440af2477f916999a67752f9f532d1620/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L254-L255
   
   This method is not synchronized, so `totalAvailablePermits` may be updated 
by other threads. Therefore, even if `totalAvailablePermits` is greater than 0 
on the first line, it may be 0 or less on the second line.
   
   As a result, `messagesToRead` becomes 0 or less and 
`IllegalArgumentException` is thrown. When this happens, the callback method is 
never executed, so `havePendingRead` will never return to false, and message 
delivery will stop.
   
   ### Modifications
   
   Fixed the part that causes this bug as follows:
   ```diff
   -if (totalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
   -    int messagesToRead = Math.min(totalAvailablePermits, readBatchSize);
   +int currentTotalAvailablePermits = totalAvailablePermits;
   +if (currentTotalAvailablePermits > 0 && isAtleastOneConsumerAvailable()) {
   +    int messagesToRead = Math.min(currentTotalAvailablePermits, 
readBatchSize);
   ```
   
   Furthermore, if `messagesToRead` is 0 or less, correct it to 1 before 
passing it to `ManagedCursorImpl#asyncReadEntriesOrWait()`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to