eolivelli commented on code in PR #16812:
URL: https://github.com/apache/pulsar/pull/16812#discussion_r930758141


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -528,135 +533,143 @@ public synchronized void 
readEntriesComplete(List<Entry> entries, Object ctx) {
             log.debug("[{}] Distributing {} messages to {} consumers", name, 
entries.size(), consumerList.size());
         }
 
+        // dispatch messages to a separate thread, but still in order for this 
subscription
+        // sendMessagesToConsumers is responsible for running broker-side 
filters
+        // that may be quite expensive
         if (serviceConfig.isDispatcherDispatchMessagesInSubscriptionThread()) {
-            // dispatch messages to a separate thread, but still in order for 
this subscription
-            // sendMessagesToConsumers is responsible for running broker-side 
filters
-            // that may be quite expensive
+            // setting sendInProgress here, because sendMessagesToConsumers 
will be executed
+            // in a separate thread, and we want to prevent more reads
+            sendInProgress = true;
             dispatchMessagesThread.execute(safeRun(() -> 
sendMessagesToConsumers(readType, entries)));
         } else {
             sendMessagesToConsumers(readType, entries);
         }
     }
 
     protected synchronized void sendMessagesToConsumers(ReadType readType, 
List<Entry> entries) {
-
-        if (needTrimAckedMessages()) {
-            cursor.trimDeletedEntries(entries);
-        }
-
-        int entriesToDispatch = entries.size();
-        // Trigger read more messages
-        if (entriesToDispatch == 0) {
-            readMoreEntries();
-            return;
-        }
-        final MessageMetadata[] metadataArray = entries.stream()
-                .map(entry -> 
Commands.peekAndCopyMessageMetadata(entry.getDataBuffer(), 
subscription.toString(), -1))
-                .toArray(MessageMetadata[]::new);
-        int remainingMessages = 
Stream.of(metadataArray).filter(Objects::nonNull)
-                .map(MessageMetadata::getNumMessagesInBatch)
-                .reduce(0, Integer::sum);
-
-        int start = 0;
-        long totalMessagesSent = 0;
-        long totalBytesSent = 0;
-        long totalEntries = 0;
-        int avgBatchSizePerMsg = remainingMessages > 0 ? 
Math.max(remainingMessages / entries.size(), 1) : 1;
-
-        int firstAvailableConsumerPermits, currentTotalAvailablePermits;
-        boolean dispatchMessage;
-        while (entriesToDispatch > 0) {
-            firstAvailableConsumerPermits = getFirstAvailableConsumerPermits();
-            currentTotalAvailablePermits = Math.max(totalAvailablePermits, 
firstAvailableConsumerPermits);
-            dispatchMessage = currentTotalAvailablePermits > 0 && 
firstAvailableConsumerPermits > 0;
-            if (!dispatchMessage) {
-                break;
+        sendInProgress = true;
+        try {
+            if (needTrimAckedMessages()) {

Review Comment:
   makes sense



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to