lhotari commented on code in PR #24610:
URL: https://github.com/apache/pulsar/pull/24610#discussion_r2257714854
##########
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java:
##########
@@ -1394,6 +1394,11 @@ private void processPayloadByProcessor(final
BrokerEntryMetadata brokerEntryMeta
final MessagePayloadContextImpl entryContext =
MessagePayloadContextImpl.get(
brokerEntryMetadata, messageMetadata, messageId, this,
redeliveryCount, ackSet, consumerEpoch);
final AtomicInteger skippedMessages = new AtomicInteger(0);
+ if (this instanceof ZeroQueueConsumerImpl<T> &&
entryContext.isBatch()) {
+ this.receiveIndividualMessagesFromBatch(brokerEntryMetadata,
+ messageMetadata, redeliveryCount, ackSet, byteBuf, null, null,
consumerEpoch, false);
+ return;
+ }
Review Comment:
instead of modifying `ConsumerImpl`, modify the `processPayloadByProcessor`
method to be a protected method so that it can be overridden in
`ZeroQueueConsumerImpl`.
in the overridden method in `ZeroQueueConsumerImpl`, you can perform the
check `if (!entryContext.isBatch()) {` and call the super method in the `else`
block you can do the error handling. Instead of calling the
`receiveIndividualMessagesFromBatch` method directly, extract the method for
error handling and call it.
I think that this way circular dependencies between ZeroQueueConsumerImpl
and ConsumerImpl would be avoided.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]