This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 61f0726 SLING-12647 - Queue should only report blocked after a few
retries (#159)
61f0726 is described below
commit 61f0726eb63e1ab76c8707319b93dcbbc838833f
Author: Christian Schneider <[email protected]>
AuthorDate: Tue Feb 4 16:39:12 2025 +0100
SLING-12647 - Queue should only report blocked after a few retries (#159)
* SLING-12647 - Queue should only report blocked after a few retries
* SLING-12647 - Queue should only report blocked after a few retries
---
.../distribution/journal/queue/impl/PubQueue.java | 11 ++++--
.../journal/queue/impl/PubQueueTest.java | 44 +++++++++++++++++-----
2 files changed, 42 insertions(+), 13 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
index 88c0ccd..95270a7 100644
---
a/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
+++
b/src/main/java/org/apache/sling/distribution/journal/queue/impl/PubQueue.java
@@ -44,6 +44,7 @@ import
org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueItemState;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.DistributionQueueState;
import org.apache.sling.distribution.queue.DistributionQueueStatus;
import org.apache.sling.distribution.queue.DistributionQueueType;
@@ -53,6 +54,8 @@ import org.slf4j.LoggerFactory;
@ParametersAreNonnullByDefault
public class PubQueue implements DistributionQueue {
+
+ private static final int BLOCKED_AFTER_NUM_ATTEMPTS = 3;
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -198,11 +201,11 @@ public class PubQueue implements DistributionQueue {
DistributionQueueEntry headEntry = getHead();
if (headEntry != null) {
itemsCount = offsetQueue.getSize();
- DistributionQueueItemState itemState =
headEntry.getStatus().getItemState();
- if (itemState == QUEUED) {
- queueState = RUNNING;
+ DistributionQueueItemStatus status = headEntry.getStatus();
+ if (status.getItemState() == DistributionQueueItemState.ERROR &&
status.getAttempts() >= BLOCKED_AFTER_NUM_ATTEMPTS) {
+ queueState = BLOCKED;
} else {
- queueState = BLOCKED;
+ queueState = RUNNING;
}
} else {
itemsCount = 0;
diff --git
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
index 379b494..88d41a2 100644
---
a/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/queue/impl/PubQueueTest.java
@@ -22,6 +22,7 @@ import static
org.apache.sling.distribution.journal.queue.QueueItemFactory.RECOR
import static
org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_PARTITION;
import static
org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TIMESTAMP;
import static
org.apache.sling.distribution.journal.queue.QueueItemFactory.RECORD_TOPIC;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -39,7 +40,10 @@ import java.util.stream.StreamSupport;
import org.apache.sling.distribution.journal.queue.OffsetQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueState;
+import org.apache.sling.distribution.queue.DistributionQueueStatus;
import org.apache.sling.distribution.queue.DistributionQueueType;
+import org.hamcrest.Matchers;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
@@ -55,32 +59,34 @@ public class PubQueueTest {
private final Semaphore invoked = new Semaphore(0);
private long lastClearOffset = 0L;
private OffsetQueue<DistributionQueueItem> offsetQueue;
- private PubQueue queue;
@Before
public void before () {
offsetQueue = new OffsetQueueImpl<>();
- queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
}
@Test
public void testGetName() throws Exception {
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
assertEquals(QUEUE_NAME, queue.getName());
}
@Test(expected = UnsupportedOperationException.class)
public void testAdd() throws Exception {
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
queue.add(queueItem(1));
}
@Test
public void testGetHeadEmpty() throws Exception {
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
assertNull(queue.getHead());
}
@Test
public void testGetHead() throws Exception {
addEntries();
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
DistributionQueueEntry headEntry = queue.getHead();
@@ -91,7 +97,7 @@ public class PubQueueTest {
@Test
public void testGetItems() throws Exception {
addEntries();
-
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
Iterator<DistributionQueueEntry> entries = queue.getEntries(1,
2).iterator();
assertNotNull(entries);
@@ -104,25 +110,43 @@ public class PubQueueTest {
@Test
public void testGetItemWithIllegalArgument() {
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
assertNull(queue.getEntry("illegal"));
assertNull(queue.getEntry("illegal@argument"));
}
+ @Test
+ public void testBlocked() throws Exception {
+ addEntries();
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 3, null,
this::clearCallback);
+ DistributionQueueStatus status = queue.getStatus();
+ assertThat(status.getState(),
Matchers.equalTo(DistributionQueueState.BLOCKED));
+ }
+
+ @Test
+ public void testNotBlocked() throws Exception {
+ addEntries();
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 2, null,
this::clearCallback);
+ DistributionQueueStatus status = queue.getStatus();
+ assertThat(status.getState(),
Matchers.equalTo(DistributionQueueState.RUNNING));
+ }
+
@Test
public void testGetItem() throws Exception {
addEntries();
-
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
String entryId = TOPIC + "-" + PARTITION + "@" + 200;
DistributionQueueEntry queueEntry = queue.getEntry(entryId);
assertNotNull(queueEntry);
assertEquals(packageId(2), queueEntry.getItem().getPackageId());
+
}
@Test
public void testRemoveHead() throws Exception {
addEntries();
-
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem());
DistributionQueueEntry removed = queue.remove(headEntryId);
@@ -133,7 +157,7 @@ public class PubQueueTest {
@Test(expected = UnsupportedOperationException.class)
public void testRemoveRandomItemFails() throws Exception {
addEntries();
-
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
String randomEntryId = EntryUtil.entryId(offsetQueue.getItem(200));
queue.remove(randomEntryId);
}
@@ -141,6 +165,7 @@ public class PubQueueTest {
@Test
public void testRemoveSetOfRandomItemsWillClear() throws Exception {
addEntries();
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem());
String randomEntryId =
EntryUtil.entryId(offsetQueue.getItem(offset(2)));
@@ -155,7 +180,7 @@ public class PubQueueTest {
@Test
public void testRemoveSetOfNonExistingItem() throws Exception {
addEntries();
-
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
Iterable<DistributionQueueEntry> removed =
queue.remove(Collections.singleton("nonexisting-0@99999"));
assertFalse(removed.iterator().hasNext());
@@ -164,7 +189,7 @@ public class PubQueueTest {
@Test
public void testClearAll() throws Exception {
addEntries();
-
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
Iterable<DistributionQueueEntry> removed = queue.clear(-1);
assertClearCallbackInvoked();
@@ -175,7 +200,7 @@ public class PubQueueTest {
@Test
public void testClearPartial() throws Exception {
addEntries();
-
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
Iterable<DistributionQueueEntry> removed = queue.clear(2);
assertClearCallbackInvoked();
@@ -185,6 +210,7 @@ public class PubQueueTest {
@Test
public void testGetType() throws Exception {
+ PubQueue queue = new PubQueue(QUEUE_NAME, offsetQueue, 0, null,
this::clearCallback);
assertEquals(DistributionQueueType.ORDERED, queue.getType());
}