This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-12647
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit bf35d402c2585dfd11a799b0e17cccaadd4189c4
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Feb 4 16:21:03 2025 +0100

    SLING-12647 - Queue should only report blocked after a few retries
---
 .../distribution/journal/queue/impl/PubQueue.java  |  8 ++--
 .../journal/queue/impl/PubQueueTest.java           | 44 +++++++++++++++++-----
 2 files changed, 40 insertions(+), 12 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
index 88c0ccd..cc8fea4 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
@@ -43,7 +43,7 @@ import 
org.apache.sling.distribution.journal.queue.ClearCallback;
 import org.apache.sling.distribution.journal.queue.OffsetQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemState;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.DistributionQueueState;
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.queue.DistributionQueueType;
@@ -53,6 +53,8 @@ import org.slf4j.LoggerFactory;
 
 @ParametersAreNonnullByDefault
 public class PubQueue implements DistributionQueue {
+       
+       private static final int BLOCKED_AFTER_NUM_ATTEMPTS = 3;
     
     private final Logger log = LoggerFactory.getLogger(this.getClass());
 
@@ -198,8 +200,8 @@ public class PubQueue implements DistributionQueue {
         DistributionQueueEntry headEntry = getHead();
         if (headEntry != null) {
             itemsCount = offsetQueue.getSize();
-            DistributionQueueItemState itemState = 
headEntry.getStatus().getItemState();
-            if (itemState == QUEUED) {
+            DistributionQueueItemStatus status = headEntry.getStatus();
+            if (status.getItemState() == QUEUED || status.getAttempts() < 
BLOCKED_AFTER_NUM_ATTEMPTS) {
                 queueState = RUNNING;
             } else {
                 queueState = BLOCKED;
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
index 379b494..88d41a2 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
@@ -22,6 +22,7 @@ import static 
org.apache.sling.distribution.journal.queue.QueueItemFactory.RECOR
 import static 
org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_PARTITION;
 import static 
org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TIMESTAMP;
 import static 
org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TOPIC;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -39,7 +40,10 @@ import java.util.stream.StreamSupport;
 import org.apache.sling.distribution.journal.queue.OffsetQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueState;
+import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.queue.DistributionQueueType;
+import org.hamcrest.Matchers;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -55,32 +59,34 @@ public class PubQueueTest {
     private final Semaphore invoked = new Semaphore(0);
     private long lastClearOffset = 0L;
     private OffsetQueue<DistributionQueueItem> offsetQueue;
-    private PubQueue queue;
 
     @Before
     public void before () {
         offsetQueue = new OffsetQueueImpl<>();
-        queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
     }
 
     @Test
     public void testGetName() throws Exception {
+       PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         assertEquals(QUEUE_NAME, queue.getName());
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testAdd() throws Exception {
+       PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         queue.add(queueItem(1));
     }
     
     @Test
     public void testGetHeadEmpty() throws Exception {
+       PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         assertNull(queue.getHead());
     }
 
     @Test
     public void testGetHead() throws Exception {
         addEntries();
+        PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         
         DistributionQueueEntry headEntry = queue.getHead();
 
@@ -91,7 +97,7 @@ public class PubQueueTest {
     @Test
     public void testGetItems() throws Exception {
         addEntries();
-        
+        PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         Iterator<DistributionQueueEntry> entries = queue.getEntries(1, 
2).iterator();
         
         assertNotNull(entries);
@@ -104,25 +110,43 @@ public class PubQueueTest {
 
     @Test
     public void testGetItemWithIllegalArgument() {
+       PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         assertNull(queue.getEntry("illegal"));
         assertNull(queue.getEntry("illegal@argument"));
     }
 
+    @Test
+    public void testBlocked() throws Exception {
+        addEntries();
+        PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 3, null, 
this::clearCallback);
+        DistributionQueueStatus status = queue.getStatus();
+        assertThat(status.getState(), 
Matchers.equalTo(DistributionQueueState.BLOCKED));
+    }
+    
+    @Test
+    public void testNotBlocked() throws Exception {
+        addEntries();
+        PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 2, null, 
this::clearCallback);
+        DistributionQueueStatus status = queue.getStatus();
+        assertThat(status.getState(), 
Matchers.equalTo(DistributionQueueState.RUNNING));
+    }
+    
     @Test
     public void testGetItem() throws Exception {
         addEntries();
-        
+        PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         String entryId = TOPIC + "-" + PARTITION + "@" + 200;
         DistributionQueueEntry queueEntry = queue.getEntry(entryId);
         
         assertNotNull(queueEntry);
         assertEquals(packageId(2), queueEntry.getItem().getPackageId());
+        
     }
 
     @Test
     public void testRemoveHead() throws Exception {
         addEntries();
-        
+        PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem());
         DistributionQueueEntry removed = queue.remove(headEntryId);
         
@@ -133,7 +157,7 @@ public class PubQueueTest {
     @Test(expected = UnsupportedOperationException.class)
     public void testRemoveRandomItemFails() throws Exception {
         addEntries();
-        
+        PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         String randomEntryId = EntryUtil.entryId(offsetQueue.getItem(200));
         queue.remove(randomEntryId);
     }
@@ -141,6 +165,7 @@ public class PubQueueTest {
     @Test
     public void testRemoveSetOfRandomItemsWillClear() throws Exception {
         addEntries();
+        PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem());
         String randomEntryId = 
EntryUtil.entryId(offsetQueue.getItem(offset(2)));
 
@@ -155,7 +180,7 @@ public class PubQueueTest {
     @Test
     public void testRemoveSetOfNonExistingItem() throws Exception {
         addEntries();
-        
+        PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         Iterable<DistributionQueueEntry> removed = 
queue.remove(Collections.singleton("nonexisting-0@99999"));
         
         assertFalse(removed.iterator().hasNext());
@@ -164,7 +189,7 @@ public class PubQueueTest {
     @Test
     public void testClearAll() throws Exception {
         addEntries();
-
+        PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         Iterable<DistributionQueueEntry> removed = queue.clear(-1);
         
         assertClearCallbackInvoked();
@@ -175,7 +200,7 @@ public class PubQueueTest {
     @Test
     public void testClearPartial() throws Exception {
         addEntries();
-        
+        PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         Iterable<DistributionQueueEntry> removed = queue.clear(2);
         
         assertClearCallbackInvoked();
@@ -185,6 +210,7 @@ public class PubQueueTest {
 
     @Test
     public void testGetType() throws Exception {
+       PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null, 
this::clearCallback);
         assertEquals(DistributionQueueType.ORDERED, queue.getType());
     }
 

Reply via email to