This is an automated email from the ASF dual-hosted git repository. cserwen pushed a commit to branch 5.0.0-beta in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/5.0.0-beta by this push: new 19996a024 bugfix : Returning minOffset when timestamp is larger than queue-unit max tampstamp 19996a024 is described below commit 19996a0244663869580dc9b9864fd367bdaa7e38 Author: hankunming <hankunm...@xiaomi.com> AuthorDate: Wed Mar 30 17:56:18 2022 +0800 bugfix : Returning minOffset when timestamp is larger than queue-unit max tampstamp --- .../main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java | 4 ++++ .../java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java index b956914cd..3db3c0e7e 100644 --- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java +++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java @@ -710,6 +710,10 @@ public class BatchConsumeQueue implements ConsumeQueueInterface, FileQueueLifeCy try { ByteBuffer byteBuffer = sbr.getByteBuffer(); int left = targetMinOffset.getIndexPos(), right = targetMaxOffset.getIndexPos(); + long maxQueueTimestamp = byteBuffer.getLong(right + MSG_STORE_TIME_OFFSET_INDEX); + if (timestamp >= maxQueueTimestamp) { + return byteBuffer.getLong(right + MSG_BASE_OFFSET_INDEX); + } int mid = binarySearchRight(byteBuffer, left, right, CQ_STORE_UNIT_SIZE, MSG_STORE_TIME_OFFSET_INDEX, timestamp); if (mid != -1) { return byteBuffer.getLong(mid + MSG_BASE_OFFSET_INDEX); diff --git a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java index 1c8e31fce..8b3b8ddb3 100644 --- a/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java +++ b/store/src/test/java/org/apache/rocketmq/store/queue/BatchConsumeQueueTest.java @@ -161,7 +161,7 @@ public class BatchConsumeQueueTest extends StoreTestBase { } end = System.currentTimeMillis(); Assert.assertTrue(end - start < 2000); - Assert.assertEquals(-1, batchConsumeQueue.getOffsetInQueueByTime(System.currentTimeMillis())); + Assert.assertEquals(199991, batchConsumeQueue.getOffsetInQueueByTime(System.currentTimeMillis())); batchConsumeQueue.destroy(); }