This is an automated email from the ASF dual-hosted git repository.
cschneider pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
The following commit(s) were added to refs/heads/master by this push:
new 3c7a5d8 SLING-12689 - Add more parameters to callback (#171)
3c7a5d8 is described below
commit 3c7a5d8a4542480ff93dfbf745b28bb982b1f68e
Author: Christian Schneider <[email protected]>
AuthorDate: Fri Mar 28 08:11:08 2025 +0100
SLING-12689 - Add more parameters to callback (#171)
---
.../journal/bookkeeper/BookKeeper.java | 26 +++++++++++-----------
.../impl/subscriber/DistributionSubscriber.java | 8 ++++---
.../impl/subscriber/NoopDistributionCallback.java | 10 +++++++--
.../journal/bookkeeper/BookKeeperTest.java | 17 ++++++++------
4 files changed, 36 insertions(+), 25 deletions(-)
diff --git
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index a2f9866..a9d6c29 100644
---
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -27,6 +27,7 @@ import static
org.apache.sling.distribution.event.DistributionEventProperties.*;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
+import java.time.Duration;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -155,13 +156,13 @@ public class BookKeeper {
* failing. For those packages importers, we aim at processing packages at
least
* once, thanks to the order in which the content updates are applied.
*/
- public void importPackage(PackageMessage pkgMsg, long offset, long
createdTime, long importStartTime) throws DistributionException {
+ public void importPackage(PackageMessage pkgMsg, long offset, Date
createdTime, Date importStartTime) throws DistributionException {
log.debug("Importing distribution package {} at offset={}", pkgMsg,
offset);
try (Timer.Context context =
subscriberMetrics.getImportedPackageDuration().time();
ResourceResolver importerResolver =
getServiceResolver(SUBSERVICE_IMPORTER)) {
// Execute the pre-processor
preProcess(pkgMsg);
- subscriberMetrics.setCurrentImport(new CurrentImportInfo(pkgMsg,
offset, importStartTime));
+ subscriberMetrics.setCurrentImport(new CurrentImportInfo(pkgMsg,
offset, importStartTime.getTime()));
packageHandler.apply(importerResolver, pkgMsg);
if (config.isEditable()) {
storeStatus(importerResolver, new
PackageStatus(Status.IMPORTED, offset, pkgMsg.getPubAgentName()));
@@ -169,7 +170,7 @@ public class BookKeeper {
storeOffset(importerResolver, offset);
importerResolver.commit();
subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
-
subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() -
createdTime), TimeUnit.MILLISECONDS);
+
subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() -
createdTime.getTime()), TimeUnit.MILLISECONDS);
// Execute the post-processor
postProcess(pkgMsg);
@@ -178,19 +179,18 @@ public class BookKeeper {
Event event = new AppliedEvent(pkgMsg,
config.getSubAgentName()).toEvent();
eventAdmin.postEvent(event);
- long currentImporturationMs = System.currentTimeMillis() -
importStartTime;
- Date createdDate = new Date(createdTime);
- log.info("Imported distribution package {} at offset={} took
importDurationMs={} created={}", pkgMsg, offset, currentImporturationMs,
createdDate);
+ Duration currentImporturation =
Duration.ofMillis(System.currentTimeMillis() - importStartTime.getTime());
+ log.info("Imported distribution package {} at offset={} took
importDurationMs={} created={}", pkgMsg, offset,
currentImporturation.toMillis(), createdTime);
subscriberMetrics.getPackageStatusCounter(pkgMsg.getPubAgentName(),
Status.IMPORTED).increment();
- distributionCallback.success(pkgMsg);
+ distributionCallback.success(pkgMsg, offset, createdTime,
currentImporturation);
} catch (DistributionException | LoginException | IOException |
RuntimeException | ImportPreProcessException |ImportPostProcessException e) {
- failure(pkgMsg, offset, e);
+ failure(pkgMsg, offset, createdTime, e);
} finally {
subscriberMetrics.clearCurrentImport();
}
}
- public void invalidateCache(PackageMessage pkgMsg, long offset, long
importStartTime) throws DistributionException {
+ public void invalidateCache(PackageMessage pkgMsg, long offset, Date
createdTime, Date importStartTime) throws DistributionException {
log.debug("Invalidating the cache for the package {} at offset={}",
pkgMsg, offset);
try (ResourceResolver resolver =
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
Map<String, Object> props =
this.buildProcessorPropertiesFromMessage(pkgMsg);
@@ -211,14 +211,14 @@ public class BookKeeper {
Event event = new AppliedEvent(pkgMsg,
config.getSubAgentName()).toEvent();
eventAdmin.postEvent(event);
- long currentImporturationMs = System.currentTimeMillis() -
importStartTime;
+ long currentImporturationMs = System.currentTimeMillis() -
importStartTime.getTime();
log.info("Invalidated the cache for the package {} at offset={}.
This took importDurationMs={}", pkgMsg, offset, currentImporturationMs);
subscriberMetrics.getPackageStatusCounter(pkgMsg.getPubAgentName(),
Status.IMPORTED).increment();
subscriberMetrics.getInvalidationProcessDuration().update((currentTimeMillis()
- invalidationStartTime), TimeUnit.MILLISECONDS);
subscriberMetrics.getInvalidationProcessSuccess().increment();
} catch (LoginException | PersistenceException |
InvalidationProcessException | RuntimeException e) {
- failure(pkgMsg, offset, e);
+ failure(pkgMsg, offset, createdTime, e);
}
}
@@ -270,7 +270,7 @@ public class BookKeeper {
*
* @throws DistributionException if the package should be retried
*/
- private void failure(PackageMessage pkgMsg, long offset, Exception e)
throws DistributionException {
+ private void failure(PackageMessage pkgMsg, long offset, Date createdTime,
Exception e) throws DistributionException {
subscriberMetrics.getFailedPackageImports().mark();
String pubAgentName = pkgMsg.getPubAgentName();
@@ -285,7 +285,7 @@ public class BookKeeper {
} catch (Exception e2) {
log.warn("Error sending log message", e2);
}
- distributionCallback.failure(pkgMsg, retries, giveUp, e);
+ distributionCallback.failure(pkgMsg, retries, createdTime, retries,
giveUp, e);
if (giveUp) {
log.warn(msg, e);
removeFailedPackage(pkgMsg, offset);
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 234b35a..0e8d389 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -33,6 +33,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -324,13 +325,14 @@ public class DistributionSubscriber {
try {
this.state.set(DistributionAgentState.RUNNING);
idleCheck.busy(bookKeeper.getRetries(pkgMsg.getPubAgentName()),
info.getCreateTime());
- long importStartTime = System.currentTimeMillis();
+ Date importStartTime = new Date();
+ Date createdTime = new Date(info.getCreateTime());
if (skip) {
bookKeeper.removePackage(pkgMsg, info.getOffset());
} else if (type == INVALIDATE) {
- bookKeeper.invalidateCache(pkgMsg, info.getOffset(),
importStartTime);
+ bookKeeper.invalidateCache(pkgMsg, info.getOffset(),
createdTime, importStartTime);
} else {
- bookKeeper.importPackage(pkgMsg, info.getOffset(),
info.getCreateTime(), importStartTime);
+ bookKeeper.importPackage(pkgMsg, info.getOffset(),
createdTime, importStartTime);
}
blockingSendStoredStatus();
} finally {
diff --git
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopDistributionCallback.java
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopDistributionCallback.java
index 752201b..33b954a 100644
---
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopDistributionCallback.java
+++
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/NoopDistributionCallback.java
@@ -18,6 +18,9 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
+import java.time.Duration;
+import java.util.Date;
+
import org.apache.sling.distribution.journal.DistributionCallback;
import org.apache.sling.distribution.journal.messages.PackageMessage;
import org.osgi.service.component.annotations.Component;
@@ -25,14 +28,17 @@ import org.osgi.service.component.annotations.Component;
@Component
public class NoopDistributionCallback implements DistributionCallback {
+
@Override
- public void success(PackageMessage packageMessage) {
+ public void success(PackageMessage packageMessage, long offset, Date
createdDate, Duration importuration) {
// NOOP
}
@Override
- public void failure(PackageMessage packageMessage, int numRetries,
boolean willDiscard, Exception ex) {
+ public void failure(PackageMessage packageMessage, long offset, Date
createdDate, int numRetries,
+ boolean willDiscard, Exception ex) {
// NOOP
+
}
}
diff --git
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index 2e84ca7..557fb50 100644
---
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -29,6 +29,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.time.Duration;
+import java.util.Date;
import java.util.UUID;
import java.util.function.Consumer;
@@ -130,7 +131,8 @@ public class BookKeeperTest {
@Test
public void testPackageImport() throws DistributionException {
try {
-
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10,
currentTimeMillis(), currentTimeMillis());
+ Date createdTime = new Date(currentTimeMillis());
+
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10,
createdTime, createdTime);
} finally {
assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0));
}
@@ -145,7 +147,8 @@ public class BookKeeperTest {
for (int c=0; c< BookKeeper.NUM_ERRORS_BLOCKING + 1; c++) {
try {
-
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10,
currentTimeMillis(), currentTimeMillis());
+ Date createdTime = new Date(currentTimeMillis());
+
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10,
createdTime, createdTime);
} catch (DistributionException e) {
}
}
@@ -169,7 +172,7 @@ public class BookKeeperTest {
}).when(packageHandler).apply(Mockito.any(ResourceResolver.class),
Mockito.any(PackageMessage.class));
- long simulatedStartTime = currentTimeMillis() -
Duration.ofMinutes(6).toMillis();
+ Date simulatedStartTime = new Date( currentTimeMillis() -
Duration.ofMinutes(6).toMillis( ));
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10,
simulatedStartTime, simulatedStartTime);
assertThat(subscriberMetrics.getCurrentImportDuration(), equalTo(0L));
@@ -191,8 +194,8 @@ public class BookKeeperTest {
}).when(packageHandler).apply(Mockito.any(ResourceResolver.class),
Mockito.any(PackageMessage.class));
- long simulatedStartTime = currentTimeMillis() -
Duration.ofMinutes(1).toMillis();
-
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10,
currentTimeMillis(), simulatedStartTime);
+ Date simulatedStartTime = new Date( currentTimeMillis() -
Duration.ofMinutes(1).toMillis());
+
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10,
new Date(currentTimeMillis()), simulatedStartTime);
assertThat(subscriberMetrics.getCurrentImportDuration(), equalTo(0L));
}
@@ -200,8 +203,8 @@ public class BookKeeperTest {
@Test
public void testCacheInvalidation() throws DistributionException {
try {
- long simulatedStartTime = currentTimeMillis() -
Duration.ofMinutes(1).toMillis();
-
bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE),
10, simulatedStartTime);
+ Date simulatedStartTime = new Date( currentTimeMillis() -
Duration.ofMinutes(1).toMillis());
+
bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE),
10L, simulatedStartTime, simulatedStartTime);
} finally {
assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0));
}