This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push: new ba3e65f SLING-12358 - Log blocking import errors (#148) ba3e65f is described below commit ba3e65f03acfa1ed095b0dba3b316bc2f4686b01 Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Sun Jun 23 15:51:55 2024 +0200 SLING-12358 - Log blocking import errors (#148) --- .../journal/bookkeeper/BookKeeper.java | 4 ++ .../journal/bookkeeper/SubscriberMetrics.java | 7 +++ .../journal/bookkeeper/BookKeeperTest.java | 50 +++++++++++++++++----- 3 files changed, 51 insertions(+), 10 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java index 969968b..3e03a66 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java @@ -85,6 +85,7 @@ public class BookKeeper { private static final String SUBSERVICE_IMPORTER = "importer"; private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper"; private static final int RETRY_SEND_DELAY = 1000; + public static final int NUM_ERRORS_BLOCKING = 4; private final Logger log = LoggerFactory.getLogger(this.getClass()); private final ResourceResolverFactory resolverFactory; @@ -278,6 +279,9 @@ public class BookKeeper { removeFailedPackage(pkgMsg, offset); subscriberMetrics.getPermanentImportErrors().increment(); } else { + if (retries == NUM_ERRORS_BLOCKING) { // Only count after a few retries to allow transient errors to recover + subscriberMetrics.getBlockingImportErrors().increment(); + } packageRetries.increase(pubAgentName); throw new DistributionException(msg, e); } diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java index c28b4ca..08cb68c 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java @@ -69,6 +69,9 @@ public class SubscriberMetrics { // Only counted in error queue setup private static final String PERMANENT_IMPORT_ERRORS = SUB_COMPONENT + "permanent_import_errors"; + + // Counts every package that fails more than n times an thus causes a blocked queue + private static final String BLOCKING_IMPORT_ERRORS = SUB_COMPONENT + "import_errors"; private static final String IMPORT_PRE_PROCESS_REQUEST_COUNT = SUB_COMPONENT + "import_pre_process_request_count"; private static final String IMPORT_POST_PROCESS_SUCCESS_COUNT = SUB_COMPONENT + "import_post_process_success_count"; @@ -258,6 +261,10 @@ public class SubscriberMetrics { public Counter getPermanentImportErrors() { return metricsService.counter(getMetricName(PERMANENT_IMPORT_ERRORS, tags)); } + + public Counter getBlockingImportErrors() { + return metricsService.counter(getMetricName(BLOCKING_IMPORT_ERRORS, tags)); + } public void currentRetries(Supplier<Integer> retriesCallback) { metricsService.gauge(getMetricName(CURRENT_RETRIES, tags), retriesCallback); diff --git a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java index 02ad60b..a799683 100644 --- a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java @@ -23,24 +23,19 @@ import static java.util.Collections.singletonList; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.UUID; import java.util.function.Consumer; -import java.util.function.Supplier; import org.apache.sling.api.resource.LoginException; import org.apache.sling.api.resource.PersistenceException; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.api.resource.ResourceResolverFactory; import org.apache.sling.commons.metrics.Counter; -import org.apache.sling.commons.metrics.Gauge; -import org.apache.sling.commons.metrics.Histogram; -import org.apache.sling.commons.metrics.Meter; import org.apache.sling.commons.metrics.MetricsService; -import org.apache.sling.commons.metrics.Timer; import org.apache.sling.commons.metrics.internal.MetricsServiceImpl; import org.apache.sling.distribution.ImportPostProcessor; import org.apache.sling.distribution.ImportPreProcessor; @@ -55,17 +50,13 @@ import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; -import org.mockito.AdditionalMatchers; import org.mockito.Mock; import org.mockito.Mockito; -import org.mockito.internal.matchers.GreaterOrEqual; import org.mockito.invocation.InvocationOnMock; import org.mockito.junit.MockitoJUnitRunner; import org.mockito.stubbing.Answer; import org.osgi.service.event.EventAdmin; -import javassist.bytecode.analysis.Analyzer; - @RunWith(MockitoJUnitRunner.class) public class BookKeeperTest { @@ -141,6 +132,45 @@ public class BookKeeperTest { } } + @Test + public void testPackageImportErrorMetric() throws DistributionException, PersistenceException { + doThrow(IllegalStateException.class) + .when(packageHandler).apply(Mockito.any(ResourceResolver.class), Mockito.any(PackageMessage.class)); + Counter counter = subscriberMetrics.getBlockingImportErrors(); + assertThat(counter.getCount(), equalTo(0L)); + + for (int c=0; c< BookKeeper.NUM_ERRORS_BLOCKING + 1; c++) { + try { + bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, currentTimeMillis()); + } catch (DistributionException e) { + } + } + + assertThat(counter.getCount(), equalTo(1L)); + } + + @Test + public void testPackageImportFailCurrentDuration() throws DistributionException, PersistenceException { + assertThat(subscriberMetrics.getCurrentImportDurationCallback().get(), equalTo(0L)); + doAnswer(new Answer<Void>() { + + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + Thread.sleep(500); + Long duration = subscriberMetrics.getCurrentImportDurationCallback().get(); + if (duration < 400L) { + throw new IllegalStateException("Should get valid duration"); + } + return null; + } + + }).when(packageHandler).apply(Mockito.any(ResourceResolver.class), Mockito.any(PackageMessage.class)); + + bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, currentTimeMillis()); + + assertThat(subscriberMetrics.getCurrentImportDurationCallback().get(), equalTo(0L)); + } + @Test public void testPackageImportCurrentDuration() throws DistributionException, PersistenceException { assertThat(subscriberMetrics.getCurrentImportDurationCallback().get(), equalTo(0L));