lhotari commented on code in PR #25010:
URL: https://github.com/apache/pulsar/pull/25010#discussion_r2707625425
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -4833,13 +4795,24 @@ public Optional<TopicName> getShadowSourceTopic() {
protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
Review Comment:
This method has a side-effect now, it's no longer a pure function for
determining whether maximum delivery delay has exceeded.
The code using this method would be misleading after adding the TTL stats
logic to this method:
https://github.com/apache/pulsar/blob/85625e0f100479dd95fb1311aeb52411b6b0a25d/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L645-L652
Keeping the new TTL stats together with the maximum delivery delay check
does help improve performance since it's possible to eliminate parsing the
message metadata multiple times. However, I think that a better solution would
be to keep these separate and move the message metadata parsing to be lazily
performed in `org.apache.pulsar.broker.service.Topic.PublishContext`
implementations.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -4833,13 +4795,24 @@ public Optional<TopicName> getShadowSourceTopic() {
protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
if (isDelayedDeliveryEnabled()) {
long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
- if (maxDeliveryDelayInMs > 0) {
- headersAndPayload.markReaderIndex();
- MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
- headersAndPayload.resetReaderIndex();
- return msgMetadata.hasDeliverAtTime()
- && msgMetadata.getDeliverAtTime() -
msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
+ if (maxDeliveryDelayInMs <= 0) {
+ return false;
+ }
+ headersAndPayload.markReaderIndex();
+ MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
+ headersAndPayload.resetReaderIndex();
+ if (!msgMetadata.hasDeliverAtTime()) {
+ return false;
}
+ long deliverAtTime = msgMetadata.getDeliverAtTime();
+ // count exceed ttl delayed messages
+ Integer messageTTLInSeconds =
topicPolicies.getMessageTTLInSeconds().get();
+ if (messageTTLInSeconds != null && messageTTLInSeconds > 0
Review Comment:
Moving the TTL stats to a new method would be preferred as mentioned in a
previous comment.
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -4833,13 +4795,24 @@ public Optional<TopicName> getShadowSourceTopic() {
protected boolean isExceedMaximumDeliveryDelay(ByteBuf headersAndPayload) {
if (isDelayedDeliveryEnabled()) {
long maxDeliveryDelayInMs = getDelayedDeliveryMaxDelayInMillis();
- if (maxDeliveryDelayInMs > 0) {
- headersAndPayload.markReaderIndex();
- MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
- headersAndPayload.resetReaderIndex();
- return msgMetadata.hasDeliverAtTime()
- && msgMetadata.getDeliverAtTime() -
msgMetadata.getPublishTime() > maxDeliveryDelayInMs;
+ if (maxDeliveryDelayInMs <= 0) {
+ return false;
+ }
+ headersAndPayload.markReaderIndex();
+ MessageMetadata msgMetadata =
Commands.parseMessageMetadata(headersAndPayload);
+ headersAndPayload.resetReaderIndex();
+ if (!msgMetadata.hasDeliverAtTime()) {
+ return false;
}
+ long deliverAtTime = msgMetadata.getDeliverAtTime();
+ // count exceed ttl delayed messages
+ Integer messageTTLInSeconds =
topicPolicies.getMessageTTLInSeconds().get();
+ if (messageTTLInSeconds != null && messageTTLInSeconds > 0
Review Comment:
there should be a check before parsing the message metadata to skip the
check completely if message TTL is not set at all. The reason for performing
this before the parsing of metadata is to avoid a performance overhead when TTL
is not set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]