lhotari commented on code in PR #24610:
URL: https://github.com/apache/pulsar/pull/24610#discussion_r2257714854


##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1394,6 +1394,11 @@ private void processPayloadByProcessor(final 
BrokerEntryMetadata brokerEntryMeta
         final MessagePayloadContextImpl entryContext = 
MessagePayloadContextImpl.get(
                 brokerEntryMetadata, messageMetadata, messageId, this, 
redeliveryCount, ackSet, consumerEpoch);
         final AtomicInteger skippedMessages = new AtomicInteger(0);
+        if (this instanceof ZeroQueueConsumerImpl<T> && 
entryContext.isBatch()) {
+            this.receiveIndividualMessagesFromBatch(brokerEntryMetadata,
+                messageMetadata, redeliveryCount, ackSet, byteBuf, null, null, 
consumerEpoch, false);
+            return;
+        }

Review Comment:
   instead of modifying `ConsumerImpl`, modify the `processPayloadByProcessor` 
method to be a protected method so that it can be overridden in 
`ZeroQueueConsumerImpl`. 
   
   in the overridden method in `ZeroQueueConsumerImpl`, you can perform the 
check `if (!entryContext.isBatch()) {` and call the super method in the `else` 
block you can do the error handling. Instead of calling the 
`receiveIndividualMessagesFromBatch` method directly, extract the method for 
error handling and call it.
   
   I think that this way circular dependencies between ZeroQueueConsumerImpl 
and ConsumerImpl would be avoided.
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to