eolivelli commented on code in PR #15628:
URL: https://github.com/apache/pulsar/pull/15628#discussion_r998987315
##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java:
##########
@@ -77,8 +77,13 @@ public boolean expireMessages(int messageTTLInSeconds) {
cursor.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries,
entry -> {
try {
- long entryTimestamp =
Commands.getEntryTimestamp(entry.getDataBuffer());
- return MessageImpl.isEntryExpired(messageTTLInSeconds,
entryTimestamp);
+ MessageImpl.EntryMetadata entryMetadata =
MessageImpl.getEntryMetadata(entry.getDataBuffer());
Review Comment:
I think that we can save resources and do not allocate this object.
we can move this logic into a the new method
`isEntryExpired(messageTTLInSeconds, entryTimestamp, delayTime)`
##########
pulsar-client/src/test/java/org/apache/pulsar/client/impl/MessageImplTest.java:
##########
@@ -484,6 +484,40 @@ public void
testMessageBrokerAndEntryMetadataTimestampMissed() {
}
}
+ @Test(timeOut = 30000)
+ public void testDelayMessageConflictWithTtlPolicy() {
+ String data = "test-message";
+ ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(),
data.length());
+ byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
+
+ long curTime = System.currentTimeMillis();
+ try {
+ MessageMetadata messageMetadata = new MessageMetadata()
+ .setSequenceId(1)
+ .setProducerName("testProducer")
+ .setPartitionKeyB64Encoded(false)
+ .setPublishTime(1)
+ .setDeliverAtTime(curTime + 50000);
+ byteBuf =
Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
messageMetadata, byteBuf);
+
+ MessageImpl.EntryMetadata entryMetadata =
MessageImpl.getEntryMetadata(byteBuf);
+ assertEquals(entryMetadata.getDelayTime(), curTime + 50000); //
delay 50s
+ assertEquals(entryMetadata.getPublishTime(), 1);
+
+ // when we set the delay message time to 50s, then when the TTL
policy is 60s, it is greater than the
+ // time set for the delayed message, so the delayed message can be
expired.
+ assertTrue(MessageImpl.isEntryExpired(60,
+ entryMetadata.getPublishTime(),
entryMetadata.getDelayTime()));
+
+ // when we set the delay message time to 50s, then when the TTL
policy is 40s, it is less than the
+ // time set for the delayed message, so the delayed message cannot
be expired.
+ assertFalse(MessageImpl.isEntryExpired(40,
+ entryMetadata.getPublishTime(),
entryMetadata.getDelayTime()));
+ } catch (Exception e) {
+ fail();
Review Comment:
no need to call "fail()" here, simply add "throws Exception" into the test
method signature and let any exception escape the method
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]