This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch SLING-12357
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 0477e32e4e22ad89e0f5fc73c870a95676c55cb0
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Sun Jun 23 19:24:58 2024 +0200

    SLING-12357 - Add logging if import takes too long
---
 pom.xml                                               |  5 +++++
 .../distribution/journal/bookkeeper/BookKeeper.java   | 19 ++++++++++++++++---
 .../impl/subscriber/DistributionSubscriber.java       |  3 ++-
 .../journal/bookkeeper/BookKeeperTest.java            | 19 ++++++++++---------
 4 files changed, 33 insertions(+), 13 deletions(-)

diff --git a/pom.xml b/pom.xml
index 53b24b2..cf3ce4a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -303,6 +303,11 @@
             <version>1.4.14</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-simple</artifactId>
+            <scope>test</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.discovery.impl</artifactId>
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 3e03a66..1ee7f06 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -29,10 +29,12 @@ import static 
org.apache.sling.distribution.event.DistributionEventProperties.DI
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
 import org.apache.sling.api.resource.LoginException;
@@ -86,6 +88,7 @@ public class BookKeeper {
     private static final String SUBSERVICE_BOOKKEEPER = "bookkeeper";
     private static final int RETRY_SEND_DELAY = 1000;
     public static final int NUM_ERRORS_BLOCKING = 4;
+    public static final Duration IMPORT_TIME_WARN_LEVEL = 
Duration.ofMinutes(5);
 
     private final Logger log = LoggerFactory.getLogger(this.getClass());
     private final ResourceResolverFactory resolverFactory;
@@ -104,6 +107,7 @@ public class BookKeeper {
     private final ImportPostProcessor importPostProcessor;
     private final InvalidationProcessor invalidationProcessor;
     private final AtomicLong currentImportStartTime;
+    private final AtomicReference<PackageMessage> currentImportPackage;
     private int skippedCounter = 0;
 
     public BookKeeper(ResourceResolverFactory resolverFactory, 
SubscriberMetrics subscriberMetrics,
@@ -128,6 +132,7 @@ public class BookKeeper {
         this.invalidationProcessor = invalidationProcessor;
         
this.subscriberMetrics.currentImportDuration(this::getCurrentImportDuration);
         this.currentImportStartTime = new AtomicLong();
+        this.currentImportPackage = new AtomicReference<>();
         log.info("Started bookkeeper {}.", config);
     }
     
@@ -147,13 +152,14 @@ public class BookKeeper {
      * failing. For those packages importers, we aim at processing packages at 
least
      * once, thanks to the order in which the content updates are applied.
      */
-    public void importPackage(PackageMessage pkgMsg, long offset, long 
createdTime) throws DistributionException {
+    public void importPackage(PackageMessage pkgMsg, long offset, long 
createdTime, long importStartTime) throws DistributionException {
         log.debug("Importing distribution package {} at offset={}", pkgMsg, 
offset);
         try (Timer.Context context = 
subscriberMetrics.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = 
getServiceResolver(SUBSERVICE_IMPORTER)) {
             // Execute the pre-processor
             preProcess(pkgMsg);
-            this.currentImportStartTime.set(System.currentTimeMillis());
+            this.currentImportStartTime.set(importStartTime);
+            this.currentImportPackage.set(pkgMsg);
             packageHandler.apply(importerResolver, pkgMsg);
             if (config.isEditable()) {
                 storeStatus(importerResolver, new 
PackageStatus(Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
@@ -504,6 +510,13 @@ public class BookKeeper {
 
     private Long getCurrentImportDuration() {
         long importStartTime = this.currentImportStartTime.get();
-        return importStartTime == 0L ? 0L : System.currentTimeMillis() - 
importStartTime;
+        if (importStartTime == 0L) {
+            return 0L; // No import running
+        }
+        long currentImportDurationMs = System.currentTimeMillis() - 
importStartTime;
+        if (currentImportDurationMs > IMPORT_TIME_WARN_LEVEL.toMillis()) {
+            log.warn("Import of packageId={} takes currentImportTimeSeconds={} 
which is longer than warnLevelSeconds={}", currentImportPackage.get(), 
currentImportDurationMs / 1000, IMPORT_TIME_WARN_LEVEL.toSeconds());
+        }
+        return currentImportDurationMs;
     }
 }
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index ffe639b..a36cc56 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -395,7 +395,8 @@ public class DistributionSubscriber {
             } else if (type == INVALIDATE) {
                 bookKeeper.invalidateCache(pkgMsg, info.getOffset());
             } else {
-                bookKeeper.importPackage(pkgMsg, info.getOffset(), 
info.getCreateTime());
+                long importStartTime = System.currentTimeMillis();
+                bookKeeper.importPackage(pkgMsg, info.getOffset(), 
info.getCreateTime(), importStartTime);
             }
         } finally {
             idleCheck.idle();
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index a799683..daed3bd 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
+import java.time.Duration;
 import java.util.UUID;
 import java.util.function.Consumer;
 
@@ -126,14 +127,14 @@ public class BookKeeperTest {
     @Test
     public void testPackageImport() throws DistributionException {
         try {
-            
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
currentTimeMillis());
+            
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
currentTimeMillis(), currentTimeMillis());
         } finally {
             assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0));
         }
     }
     
     @Test
-    public void testPackageImportErrorMetric() throws DistributionException, 
PersistenceException {
+    public void testPackageBlockingImportErrorMetric() throws 
DistributionException, PersistenceException {
         doThrow(IllegalStateException.class) 
             .when(packageHandler).apply(Mockito.any(ResourceResolver.class), 
Mockito.any(PackageMessage.class));
         Counter counter = subscriberMetrics.getBlockingImportErrors();
@@ -141,7 +142,7 @@ public class BookKeeperTest {
         
         for (int c=0; c< BookKeeper.NUM_ERRORS_BLOCKING + 1; c++) {
             try {
-                
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
currentTimeMillis());
+                
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
currentTimeMillis(), currentTimeMillis());
             } catch (DistributionException e) {
             }
         }
@@ -156,9 +157,8 @@ public class BookKeeperTest {
 
             @Override
             public Void answer(InvocationOnMock invocation) throws Throwable {
-                Thread.sleep(500);
                 Long duration = 
subscriberMetrics.getCurrentImportDurationCallback().get();
-                if (duration < 400L) {
+                if (duration < Duration.ofMinutes(6).toMillis()) {
                     throw new IllegalStateException("Should get valid 
duration");
                 }
                 return null;
@@ -166,7 +166,8 @@ public class BookKeeperTest {
             
         }).when(packageHandler).apply(Mockito.any(ResourceResolver.class), 
Mockito.any(PackageMessage.class));
         
-        
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
currentTimeMillis());
+        long simulatedStartTime = currentTimeMillis() - 
Duration.ofMinutes(6).toMillis();
+        
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
simulatedStartTime, simulatedStartTime);
         
         assertThat(subscriberMetrics.getCurrentImportDurationCallback().get(), 
equalTo(0L));
     }
@@ -178,9 +179,8 @@ public class BookKeeperTest {
 
             @Override
             public Void answer(InvocationOnMock invocation) throws Throwable {
-                Thread.sleep(500);
                 Long duration = 
subscriberMetrics.getCurrentImportDurationCallback().get();
-                if (duration < 400L) {
+                if (duration < Duration.ofMinutes(1).toMillis()) {
                     throw new IllegalStateException("Should get valid 
duration");
                 }
                 return null;
@@ -188,7 +188,8 @@ public class BookKeeperTest {
             
         }).when(packageHandler).apply(Mockito.any(ResourceResolver.class), 
Mockito.any(PackageMessage.class));
         
-        
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
currentTimeMillis());
+        long simulatedStartTime = currentTimeMillis() - 
Duration.ofMinutes(1).toMillis();
+        
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
currentTimeMillis(), simulatedStartTime);
         
         assertThat(subscriberMetrics.getCurrentImportDurationCallback().get(), 
equalTo(0L));
     }

Reply via email to