This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch SLING-12647 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit bf35d402c2585dfd11a799b0e17cccaadd4189c4 Author: Christian Schneider <[email protected]> AuthorDate: Tue Feb 4 16:21:03 2025 +0100 SLING-12647 - Queue should only report blocked after a few retries --- .../distribution/journal/queue/impl/PubQueue.java | 8 ++-- .../journal/queue/impl/PubQueueTest.java | 44 +++++++++++++++++----- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java index 88c0ccd..cc8fea4 100644 --- a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java +++ b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java @@ -43,7 +43,7 @@ import org.apache.sling.distribution.journal.queue.ClearCallback; import org.apache.sling.distribution.journal.queue.OffsetQueue; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; -import org.apache.sling.distribution.queue.DistributionQueueItemState; +import org.apache.sling.distribution.queue.DistributionQueueItemStatus; import org.apache.sling.distribution.queue.DistributionQueueState; import org.apache.sling.distribution.queue.DistributionQueueStatus; import org.apache.sling.distribution.queue.DistributionQueueType; @@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory; @ParametersAreNonnullByDefault public class PubQueue implements DistributionQueue { + + private static final int BLOCKED_AFTER_NUM_ATTEMPTS = 3; private final Logger log = LoggerFactory.getLogger(this.getClass()); @@ -198,8 +200,8 @@ public class PubQueue implements DistributionQueue { DistributionQueueEntry headEntry = getHead(); if (headEntry != null) { itemsCount = offsetQueue.getSize(); - DistributionQueueItemState itemState = headEntry.getStatus().getItemState(); - if (itemState == QUEUED) { + DistributionQueueItemStatus status = headEntry.getStatus(); + if (status.getItemState() == QUEUED || status.getAttempts() < BLOCKED_AFTER_NUM_ATTEMPTS) { queueState = RUNNING; } else { queueState = BLOCKED; diff --git a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java index 379b494..88d41a2 100644 --- a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java @@ -22,6 +22,7 @@ import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECOR import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_PARTITION; import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TIMESTAMP; import static org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TOPIC; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -39,7 +40,10 @@ import java.util.stream.StreamSupport; import org.apache.sling.distribution.journal.queue.OffsetQueue; import org.apache.sling.distribution.queue.DistributionQueueEntry; import org.apache.sling.distribution.queue.DistributionQueueItem; +import org.apache.sling.distribution.queue.DistributionQueueState; +import org.apache.sling.distribution.queue.DistributionQueueStatus; import org.apache.sling.distribution.queue.DistributionQueueType; +import org.hamcrest.Matchers; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -55,32 +59,34 @@ public class PubQueueTest { private final Semaphore invoked = new Semaphore(0); private long lastClearOffset = 0L; private OffsetQueue<DistributionQueueItem> offsetQueue; - private PubQueue queue; @Before public void before () { offsetQueue = new OffsetQueueImpl<>(); - queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); } @Test public void testGetName() throws Exception { + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); assertEquals(QUEUE_NAME, queue.getName()); } @Test(expected = UnsupportedOperationException.class) public void testAdd() throws Exception { + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); queue.add(queueItem(1)); } @Test public void testGetHeadEmpty() throws Exception { + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); assertNull(queue.getHead()); } @Test public void testGetHead() throws Exception { addEntries(); + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); DistributionQueueEntry headEntry = queue.getHead(); @@ -91,7 +97,7 @@ public class PubQueueTest { @Test public void testGetItems() throws Exception { addEntries(); - + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); Iterator<DistributionQueueEntry> entries = queue.getEntries(1, 2).iterator(); assertNotNull(entries); @@ -104,25 +110,43 @@ public class PubQueueTest { @Test public void testGetItemWithIllegalArgument() { + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); assertNull(queue.getEntry("illegal")); assertNull(queue.getEntry("illegal@argument")); } + @Test + public void testBlocked() throws Exception { + addEntries(); + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 3, null, this::clearCallback); + DistributionQueueStatus status = queue.getStatus(); + assertThat(status.getState(), Matchers.equalTo(DistributionQueueState.BLOCKED)); + } + + @Test + public void testNotBlocked() throws Exception { + addEntries(); + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 2, null, this::clearCallback); + DistributionQueueStatus status = queue.getStatus(); + assertThat(status.getState(), Matchers.equalTo(DistributionQueueState.RUNNING)); + } + @Test public void testGetItem() throws Exception { addEntries(); - + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); String entryId = TOPIC + "-" + PARTITION + "@" + 200; DistributionQueueEntry queueEntry = queue.getEntry(entryId); assertNotNull(queueEntry); assertEquals(packageId(2), queueEntry.getItem().getPackageId()); + } @Test public void testRemoveHead() throws Exception { addEntries(); - + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem()); DistributionQueueEntry removed = queue.remove(headEntryId); @@ -133,7 +157,7 @@ public class PubQueueTest { @Test(expected = UnsupportedOperationException.class) public void testRemoveRandomItemFails() throws Exception { addEntries(); - + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); String randomEntryId = EntryUtil.entryId(offsetQueue.getItem(200)); queue.remove(randomEntryId); } @@ -141,6 +165,7 @@ public class PubQueueTest { @Test public void testRemoveSetOfRandomItemsWillClear() throws Exception { addEntries(); + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem()); String randomEntryId = EntryUtil.entryId(offsetQueue.getItem(offset(2))); @@ -155,7 +180,7 @@ public class PubQueueTest { @Test public void testRemoveSetOfNonExistingItem() throws Exception { addEntries(); - + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); Iterable<DistributionQueueEntry> removed = queue.remove(Collections.singleton("nonexisting-0@99999")); assertFalse(removed.iterator().hasNext()); @@ -164,7 +189,7 @@ public class PubQueueTest { @Test public void testClearAll() throws Exception { addEntries(); - + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); Iterable<DistributionQueueEntry> removed = queue.clear(-1); assertClearCallbackInvoked(); @@ -175,7 +200,7 @@ public class PubQueueTest { @Test public void testClearPartial() throws Exception { addEntries(); - + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); Iterable<DistributionQueueEntry> removed = queue.clear(2); assertClearCallbackInvoked(); @@ -185,6 +210,7 @@ public class PubQueueTest { @Test public void testGetType() throws Exception { + PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, this::clearCallback); assertEquals(DistributionQueueType.ORDERED, queue.getType()); }
