This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c7a5d8  SLING-12689 - Add more parameters to callback (#171)
3c7a5d8 is described below

commit 3c7a5d8a4542480ff93dfbf745b28bb982b1f68e
Author: Christian Schneider <[email protected]>
AuthorDate: Fri Mar 28 08:11:08 2025 +0100

    SLING-12689 - Add more parameters to callback (#171)
---
 .../journal/bookkeeper/BookKeeper.java             | 26 +++++++++++-----------
 .../impl/subscriber/DistributionSubscriber.java    |  8 ++++---
 .../impl/subscriber/NoopDistributionCallback.java  | 10 +++++++--
 .../journal/bookkeeper/BookKeeperTest.java         | 17 ++++++++------
 4 files changed, 36 insertions(+), 25 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index a2f9866..a9d6c29 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -27,6 +27,7 @@ import static 
org.apache.sling.distribution.event.DistributionEventProperties.*;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.StringWriter;
+import java.time.Duration;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
@@ -155,13 +156,13 @@ public class BookKeeper {
      * failing. For those packages importers, we aim at processing packages at 
least
      * once, thanks to the order in which the content updates are applied.
      */
-    public void importPackage(PackageMessage pkgMsg, long offset, long 
createdTime, long importStartTime) throws DistributionException {
+    public void importPackage(PackageMessage pkgMsg, long offset, Date 
createdTime, Date importStartTime) throws DistributionException {
         log.debug("Importing distribution package {} at offset={}", pkgMsg, 
offset);
         try (Timer.Context context = 
subscriberMetrics.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = 
getServiceResolver(SUBSERVICE_IMPORTER)) {
             // Execute the pre-processor
             preProcess(pkgMsg);
-            subscriberMetrics.setCurrentImport(new CurrentImportInfo(pkgMsg, 
offset, importStartTime));
+            subscriberMetrics.setCurrentImport(new CurrentImportInfo(pkgMsg, 
offset, importStartTime.getTime()));
             packageHandler.apply(importerResolver, pkgMsg);
             if (config.isEditable()) {
                 storeStatus(importerResolver, new 
PackageStatus(Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
@@ -169,7 +170,7 @@ public class BookKeeper {
             storeOffset(importerResolver, offset);
             importerResolver.commit();
             
subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
-            
subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - 
createdTime), TimeUnit.MILLISECONDS);
+            
subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - 
createdTime.getTime()), TimeUnit.MILLISECONDS);
             
             // Execute the post-processor
             postProcess(pkgMsg);
@@ -178,19 +179,18 @@ public class BookKeeper {
 
             Event event = new AppliedEvent(pkgMsg, 
config.getSubAgentName()).toEvent();
             eventAdmin.postEvent(event);
-            long currentImporturationMs = System.currentTimeMillis() - 
importStartTime;
-            Date createdDate = new Date(createdTime);
-            log.info("Imported distribution package {} at offset={} took 
importDurationMs={} created={}", pkgMsg, offset, currentImporturationMs, 
createdDate);
+            Duration currentImporturation = 
Duration.ofMillis(System.currentTimeMillis() - importStartTime.getTime());
+            log.info("Imported distribution package {} at offset={} took 
importDurationMs={} created={}", pkgMsg, offset, 
currentImporturation.toMillis(), createdTime);
             
subscriberMetrics.getPackageStatusCounter(pkgMsg.getPubAgentName(), 
Status.IMPORTED).increment();
-            distributionCallback.success(pkgMsg);
+            distributionCallback.success(pkgMsg, offset, createdTime, 
currentImporturation);
         } catch (DistributionException | LoginException | IOException | 
RuntimeException | ImportPreProcessException |ImportPostProcessException e) {
-            failure(pkgMsg, offset, e);
+            failure(pkgMsg, offset, createdTime, e);
         } finally {
             subscriberMetrics.clearCurrentImport();
         }
     }
 
-    public void invalidateCache(PackageMessage pkgMsg, long offset, long 
importStartTime) throws DistributionException {
+    public void invalidateCache(PackageMessage pkgMsg, long offset, Date 
createdTime, Date importStartTime) throws DistributionException {
         log.debug("Invalidating the cache for the package {} at offset={}", 
pkgMsg, offset);
         try (ResourceResolver resolver = 
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
             Map<String, Object> props = 
this.buildProcessorPropertiesFromMessage(pkgMsg);
@@ -211,14 +211,14 @@ public class BookKeeper {
 
             Event event = new AppliedEvent(pkgMsg, 
config.getSubAgentName()).toEvent();
             eventAdmin.postEvent(event);
-            long currentImporturationMs = System.currentTimeMillis() - 
importStartTime;
+            long currentImporturationMs = System.currentTimeMillis() - 
importStartTime.getTime();
             log.info("Invalidated the cache for the package {} at offset={}. 
This took importDurationMs={}", pkgMsg, offset, currentImporturationMs);
 
             
subscriberMetrics.getPackageStatusCounter(pkgMsg.getPubAgentName(), 
Status.IMPORTED).increment();
             
subscriberMetrics.getInvalidationProcessDuration().update((currentTimeMillis() 
- invalidationStartTime), TimeUnit.MILLISECONDS);
             subscriberMetrics.getInvalidationProcessSuccess().increment();
         } catch (LoginException | PersistenceException | 
InvalidationProcessException | RuntimeException e) {
-            failure(pkgMsg, offset, e);
+            failure(pkgMsg, offset, createdTime, e);
         }
     }
 
@@ -270,7 +270,7 @@ public class BookKeeper {
      *
      * @throws DistributionException if the package should be retried
      */
-    private void failure(PackageMessage pkgMsg, long offset, Exception e) 
throws DistributionException {
+    private void failure(PackageMessage pkgMsg, long offset, Date createdTime, 
Exception e) throws DistributionException {
         subscriberMetrics.getFailedPackageImports().mark();
 
         String pubAgentName = pkgMsg.getPubAgentName();
@@ -285,7 +285,7 @@ public class BookKeeper {
         } catch (Exception e2) {
             log.warn("Error sending log message", e2);
         }
-        distributionCallback.failure(pkgMsg, retries, giveUp, e);
+        distributionCallback.failure(pkgMsg, retries, createdTime, retries, 
giveUp, e);
         if (giveUp) {
             log.warn(msg, e);
             removeFailedPackage(pkgMsg, offset);
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 234b35a..0e8d389 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -33,6 +33,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Date;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -324,13 +325,14 @@ public class DistributionSubscriber {
         try {
                this.state.set(DistributionAgentState.RUNNING);
             idleCheck.busy(bookKeeper.getRetries(pkgMsg.getPubAgentName()), 
info.getCreateTime());
-            long importStartTime = System.currentTimeMillis();
+            Date importStartTime = new Date();
+            Date createdTime = new Date(info.getCreateTime());
             if (skip) {
                 bookKeeper.removePackage(pkgMsg, info.getOffset());
             } else if (type == INVALIDATE) {
-                bookKeeper.invalidateCache(pkgMsg, info.getOffset(), 
importStartTime);
+                bookKeeper.invalidateCache(pkgMsg, info.getOffset(), 
createdTime, importStartTime);
             } else {
-                bookKeeper.importPackage(pkgMsg, info.getOffset(), 
info.getCreateTime(), importStartTime);
+                bookKeeper.importPackage(pkgMsg, info.getOffset(), 
createdTime, importStartTime);
             }
             blockingSendStoredStatus();
         } finally {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopDistributionCallback.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopDistributionCallback.java
index 752201b..33b954a 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopDistributionCallback.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopDistributionCallback.java
@@ -18,6 +18,9 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
+import java.time.Duration;
+import java.util.Date;
+
 import org.apache.sling.distribution.journal.DistributionCallback;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.osgi.service.component.annotations.Component;
@@ -25,14 +28,17 @@ import org.osgi.service.component.annotations.Component;
 @Component
 public class NoopDistributionCallback implements DistributionCallback {
 
+
        @Override
-       public void success(PackageMessage packageMessage) {
+       public void success(PackageMessage packageMessage, long offset, Date 
createdDate, Duration importuration) {
                // NOOP
        }
 
        @Override
-       public void failure(PackageMessage packageMessage, int numRetries, 
boolean willDiscard, Exception ex) {
+       public void failure(PackageMessage packageMessage, long offset, Date 
createdDate, int numRetries,
+                       boolean willDiscard, Exception ex) {
                // NOOP
+               
        }
 
 }
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index 2e84ca7..557fb50 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.time.Duration;
+import java.util.Date;
 import java.util.UUID;
 import java.util.function.Consumer;
 
@@ -130,7 +131,8 @@ public class BookKeeperTest {
     @Test
     public void testPackageImport() throws DistributionException {
         try {
-            
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
currentTimeMillis(), currentTimeMillis());
+            Date createdTime = new Date(currentTimeMillis());
+                       
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
createdTime, createdTime);
         } finally {
             assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0));
         }
@@ -145,7 +147,8 @@ public class BookKeeperTest {
         
         for (int c=0; c< BookKeeper.NUM_ERRORS_BLOCKING + 1; c++) {
             try {
-                
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
currentTimeMillis(), currentTimeMillis());
+                Date createdTime = new Date(currentTimeMillis());
+                
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
createdTime, createdTime);
             } catch (DistributionException e) {
             }
         }
@@ -169,7 +172,7 @@ public class BookKeeperTest {
             
         }).when(packageHandler).apply(Mockito.any(ResourceResolver.class), 
Mockito.any(PackageMessage.class));
         
-        long simulatedStartTime = currentTimeMillis() - 
Duration.ofMinutes(6).toMillis();
+        Date simulatedStartTime = new Date( currentTimeMillis() - 
Duration.ofMinutes(6).toMillis( ));
         
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
simulatedStartTime, simulatedStartTime);
         
         assertThat(subscriberMetrics.getCurrentImportDuration(), equalTo(0L));
@@ -191,8 +194,8 @@ public class BookKeeperTest {
             
         }).when(packageHandler).apply(Mockito.any(ResourceResolver.class), 
Mockito.any(PackageMessage.class));
         
-        long simulatedStartTime = currentTimeMillis() - 
Duration.ofMinutes(1).toMillis();
-        
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
currentTimeMillis(), simulatedStartTime);
+        Date simulatedStartTime = new Date( currentTimeMillis() - 
Duration.ofMinutes(1).toMillis());
+        
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
new Date(currentTimeMillis()), simulatedStartTime);
         
         assertThat(subscriberMetrics.getCurrentImportDuration(), equalTo(0L));
     }
@@ -200,8 +203,8 @@ public class BookKeeperTest {
     @Test
     public void testCacheInvalidation() throws DistributionException {
         try {
-            long simulatedStartTime = currentTimeMillis() - 
Duration.ofMinutes(1).toMillis();
-            
bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE),
 10, simulatedStartTime);
+               Date simulatedStartTime = new Date( currentTimeMillis() - 
Duration.ofMinutes(1).toMillis());
+            
bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE),
 10L, simulatedStartTime, simulatedStartTime);
         } finally {
             assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0));
         }

Reply via email to