lhotari commented on code in PR #21798:
URL: https://github.com/apache/pulsar/pull/21798#discussion_r1435807296


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##########
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
             }
         }
 
+        if (topic.isPersistent()) {
+            PersistentTopic pTopic = (PersistentTopic) topic;
+            if (pTopic.isDelayedDeliveryEnabled()) {
+                long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+                if (maxDeliveryDelayInMs > 0) {
+                    headersAndPayload.markReaderIndex();
+                    MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+                    headersAndPayload.resetReaderIndex();
+                    if (msgMetadata.hasDeliverAtTime()
+                            && msgMetadata.getDeliverAtTime() - 
msgMetadata.getPublishTime() > maxDeliveryDelayInMs) {
+                        cnx.execute(() -> {
+                            cnx.getCommandSender().sendSendError(producerId, 
sequenceId,
+                                    ServerError.NotAllowedError,
+                                    String.format("Exceeds max allowed 
delivery delay of %s milliseconds",
+                                            maxDeliveryDelayInMs));
+                            cnx.completedSendOperation(false, 
headersAndPayload.readableBytes());
+                        });
+                        return false;
+                    }

Review Comment:
   I'm not sure about it, but there's a chance that completedSendOperation 
should only be called if the message was accepted for sending by returning true 
from this method. 



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java:
##########
@@ -263,6 +263,29 @@ public boolean checkAndStartPublish(long producerId, long 
sequenceId, ByteBuf he
             }
         }
 
+        if (topic.isPersistent()) {
+            PersistentTopic pTopic = (PersistentTopic) topic;
+            if (pTopic.isDelayedDeliveryEnabled()) {
+                long maxDeliveryDelayInMs = 
pTopic.getDelayedDeliveryMaxDelayInMillis();
+                if (maxDeliveryDelayInMs > 0) {
+                    headersAndPayload.markReaderIndex();
+                    MessageMetadata msgMetadata = 
Commands.parseMessageMetadata(headersAndPayload);
+                    headersAndPayload.resetReaderIndex();

Review Comment:
   This has a cost so it would be good to have this check later in the send 
processing so that there wouldn't be an extra parsing step. Did you check if it 
would be possible to put this logic is later phase of sending?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to