This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-12358
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 05cc95956afc620bda8d5aaa61bd039f65de5264
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Sun Jun 23 15:29:32 2024 +0200

    SLING-12358 - Log blocking import errors
---
 .../journal/bookkeeper/BookKeeper.java             |  4 ++
 .../journal/bookkeeper/SubscriberMetrics.java      |  7 +++
 .../journal/bookkeeper/BookKeeperTest.java         | 50 +++++++++++++++++-----
 3 files changed, 51 insertions(+), 10 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 969968b..3e03a66 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -85,6 +85,7 @@ public class BookKeeper {
     private static final String SUBSERVICE_IMPORTER = "importer";
     private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
     private static final int RETRY_SEND_DELAY = 1000;
+    public static final int NUM_ERRORS_BLOCKING = 4;
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
     private final ResourceResolverFactory resolverFactory;
@@ -278,6 +279,9 @@ public class BookKeeper {
             removeFailedPackage(pkgMsg, offset);
             subscriberMetrics.getPermanentImportErrors().increment();
         } else {
+            if (retries == NUM_ERRORS_BLOCKING) { // Only count after a few 
retries to allow transient errors to recover
+                subscriberMetrics.getBlockingImportErrors().increment();
+            }
             packageRetries.increase(pubAgentName);
             throw new DistributionException(msg, e);
         }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
index c28b4ca..08cb68c 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
@@ -69,6 +69,9 @@ public class SubscriberMetrics {
 
     // Only counted in error queue setup
     private static final String PERMANENT_IMPORT_ERRORS = SUB_COMPONENT + 
"permanent_import_errors";
+    
+    // Counts every package that fails more than n times an thus causes a 
blocked queue
+    private static final String BLOCKING_IMPORT_ERRORS = SUB_COMPONENT + 
"import_errors";
 
     private static final String IMPORT_PRE_PROCESS_REQUEST_COUNT = 
SUB_COMPONENT + "import_pre_process_request_count";
     private static final String IMPORT_POST_PROCESS_SUCCESS_COUNT = 
SUB_COMPONENT + "import_post_process_success_count";
@@ -258,6 +261,10 @@ public class SubscriberMetrics {
     public Counter getPermanentImportErrors() { 
         return metricsService.counter(getMetricName(PERMANENT_IMPORT_ERRORS, 
tags));
     }
+    
+    public Counter getBlockingImportErrors() { 
+        return metricsService.counter(getMetricName(BLOCKING_IMPORT_ERRORS, 
tags));
+    }
 
     public void currentRetries(Supplier<Integer> retriesCallback) {
         metricsService.gauge(getMetricName(CURRENT_RETRIES, tags), 
retriesCallback);
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index 02ad60b..a799683 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -23,24 +23,19 @@ import static java.util.Collections.singletonList;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.util.UUID;
 import java.util.function.Consumer;
-import java.util.function.Supplier;
 
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.metrics.Counter;
-import org.apache.sling.commons.metrics.Gauge;
-import org.apache.sling.commons.metrics.Histogram;
-import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.MetricsService;
-import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.commons.metrics.internal.MetricsServiceImpl;
 import org.apache.sling.distribution.ImportPostProcessor;
 import org.apache.sling.distribution.ImportPreProcessor;
@@ -55,17 +50,13 @@ import 
org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
-import org.mockito.AdditionalMatchers;
 import org.mockito.Mock;
 import org.mockito.Mockito;
-import org.mockito.internal.matchers.GreaterOrEqual;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.junit.MockitoJUnitRunner;
 import org.mockito.stubbing.Answer;
 import org.osgi.service.event.EventAdmin;
 
-import javassist.bytecode.analysis.Analyzer;
-
 @RunWith(MockitoJUnitRunner.class)
 public class BookKeeperTest {
 
@@ -141,6 +132,45 @@ public class BookKeeperTest {
         }
     }
     
+    @Test
+    public void testPackageImportErrorMetric() throws DistributionException, 
PersistenceException {
+        doThrow(IllegalStateException.class) 
+            .when(packageHandler).apply(Mockito.any(ResourceResolver.class), 
Mockito.any(PackageMessage.class));
+        Counter counter = subscriberMetrics.getBlockingImportErrors();
+        assertThat(counter.getCount(), equalTo(0L));
+        
+        for (int c=0; c< BookKeeper.NUM_ERRORS_BLOCKING + 1; c++) {
+            try {
+                
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
currentTimeMillis());
+            } catch (DistributionException e) {
+            }
+        }
+        
+        assertThat(counter.getCount(), equalTo(1L));
+    }
+    
+    @Test
+    public void testPackageImportFailCurrentDuration() throws 
DistributionException, PersistenceException {
+        assertThat(subscriberMetrics.getCurrentImportDurationCallback().get(), 
equalTo(0L));
+        doAnswer(new Answer<Void>() {
+
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                Thread.sleep(500);
+                Long duration = 
subscriberMetrics.getCurrentImportDurationCallback().get();
+                if (duration < 400L) {
+                    throw new IllegalStateException("Should get valid 
duration");
+                }
+                return null;
+            }
+            
+        }).when(packageHandler).apply(Mockito.any(ResourceResolver.class), 
Mockito.any(PackageMessage.class));
+        
+        
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
currentTimeMillis());
+        
+        assertThat(subscriberMetrics.getCurrentImportDurationCallback().get(), 
equalTo(0L));
+    }
+    
     @Test
     public void testPackageImportCurrentDuration() throws 
DistributionException, PersistenceException {
         assertThat(subscriberMetrics.getCurrentImportDurationCallback().get(), 
equalTo(0L));

Reply via email to