This is an automated email from the ASF dual-hosted git repository. cschneider pushed a commit to branch GRANITE-39892 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit d28b90c1bb3a26505bc3903e6e1f623d98e18b41 Author: Christian Schneider <cschn...@adobe.com> AuthorDate: Thu Nov 30 14:30:30 2023 +0100 GRANITE-39892 - Simplify DistributionPublisher --- .../journal/impl/publisher/AgentState.java | 2 + .../impl/publisher/DistributionPublisher.java | 63 ++++++++-------------- 2 files changed, 24 insertions(+), 41 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java index b0cd53e..644db32 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.stream.Stream; import java.util.stream.StreamSupport; +import javax.annotation.Nonnull; import javax.annotation.ParametersAreNonnullByDefault; import org.apache.sling.distribution.agent.DistributionAgentState; @@ -40,6 +41,7 @@ public class AgentState { private AgentState() { } + @Nonnull public static DistributionAgentState getState(DistributionAgent agent) { boolean empty = queueStatuses(agent).noneMatch(AgentState::queueNotEmpty); if (empty) { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java index 34f5851..a81a210 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java @@ -31,17 +31,16 @@ import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; -import java.util.function.ToLongFunction; import javax.annotation.Nonnull; import javax.annotation.ParametersAreNonnullByDefault; import javax.management.NotCompliantMBeanException; import org.apache.commons.io.IOUtils; -import org.apache.sling.distribution.DistributionResponseInfo; import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService; import org.apache.sling.distribution.journal.impl.event.DistributionEvent; import org.apache.sling.distribution.journal.messages.PackageMessage; +import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.journal.queue.PubQueueProvider; import org.apache.sling.distribution.journal.shared.DefaultDistributionLog; @@ -52,7 +51,6 @@ import org.apache.sling.distribution.journal.shared.Topics; import org.apache.sling.api.resource.ResourceResolver; import org.apache.sling.distribution.DistributionRequest; import org.apache.sling.distribution.DistributionRequestState; -import org.apache.sling.distribution.DistributionRequestType; import org.apache.sling.distribution.DistributionResponse; import org.apache.sling.distribution.agent.DistributionAgentState; import org.apache.sling.distribution.agent.spi.DistributionAgent; @@ -73,7 +71,6 @@ import org.osgi.service.metatype.annotations.Designate; import org.apache.sling.distribution.journal.MessagingProvider; import org.apache.sling.distribution.journal.Reset; import org.apache.sling.distribution.journal.HandlerAdapter; -import org.apache.sling.distribution.journal.JournalAvailable; /** * A Publisher SCD agent which produces messages to be consumed by a {@code DistributionSubscriber} agent. @@ -89,8 +86,7 @@ public class DistributionPublisher implements DistributionAgent { public static final String FACTORY_PID = "org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory"; - private final EnumMap<DistributionRequestType, ToLongFunction<PackageMessage>> reqTypes = new EnumMap<>(DistributionRequestType.class); - + @Nonnull private final DefaultDistributionLog log; @Reference @@ -136,10 +132,6 @@ public class DistributionPublisher implements DistributionAgent { public DistributionPublisher() { log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO); - reqTypes.put(ADD, this::sendAndWait); - reqTypes.put(DELETE, this::sendAndWait); - reqTypes.put(INVALIDATE, this::sendAndWait); - reqTypes.put(TEST, this::send); } @Activate @@ -205,6 +197,7 @@ public class DistributionPublisher implements DistributionAgent { /** * Get queue names for alive subscribed subscriber agents. */ + @SuppressWarnings("null") @Nonnull @Override public Iterable<String> getQueueNames() { @@ -242,24 +235,22 @@ public class DistributionPublisher implements DistributionAgent { public DistributionResponse execute(ResourceResolver resourceResolver, DistributionRequest request) throws DistributionException { - ToLongFunction<PackageMessage> handler = reqTypes.get(request.getRequestType()); - if (handler != null) { - return execute(resourceResolver, request, handler); - } else { - return executeUnsupported(request); + if (request.getRequestType() == PULL) { + String msg = "Request requestType=PULL not supported by this agent"; + log.info(msg); + return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg); } + final PackageMessage pkg = buildPackage(resourceResolver, request); + return sendPackageMessage(pkg); } - private DistributionResponse execute(ResourceResolver resourceResolver, - DistributionRequest request, - ToLongFunction<PackageMessage> sender) + private PackageMessage buildPackage(ResourceResolver resourceResolver, DistributionRequest request) throws DistributionException { - final PackageMessage pkg; try { if (request.getRequestType() != TEST && request.getPaths().length == 0) { throw new DistributionException("Empty paths are not allowed"); } - pkg = timed(distributionMetricsService.getBuildPackageDuration(), () -> factory.create(packageBuilder, resourceResolver, pubAgentName, request)); + return timed(distributionMetricsService.getBuildPackageDuration(), () -> factory.create(packageBuilder, resourceResolver, pubAgentName, request)); } catch (Exception e) { distributionMetricsService.getDroppedRequests().mark(); String msg = format("Failed to create content package for requestType=%s, paths=%s. Error=%s", @@ -267,19 +258,17 @@ public class DistributionPublisher implements DistributionAgent { log.error(msg, e); throw new DistributionException(msg, e); } - + } + + @Nonnull + private DistributionResponse sendPackageMessage(final PackageMessage pkg) throws DistributionException { try { - long offset = timed(distributionMetricsService.getEnqueuePackageDuration(), () -> sender.applyAsLong(pkg)); + long offset = timed(distributionMetricsService.getEnqueuePackageDuration(), () -> this.sendAndWait(pkg)); distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength()); distributionMetricsService.getAcceptedRequests().mark(); String msg = format("Request accepted with distribution package %s at offset=%s", pkg, offset); log.info(msg); - return new SimpleDistributionResponse(ACCEPTED, msg, new DistributionResponseInfo() { - @Nonnull @Override - public String getId() { - return pkg.getPkgId(); - } - }); + return new SimpleDistributionResponse(ACCEPTED, msg, pkg::getPkgId); } catch (Throwable e) { distributionMetricsService.getDroppedRequests().mark(); String msg = format("Failed to append distribution package %s to the journal", pkg); @@ -291,13 +280,13 @@ public class DistributionPublisher implements DistributionAgent { } } } - - private long send(PackageMessage pkg) { - sender.accept(pkg); - return -1; - } private long sendAndWait(PackageMessage pkg) { + if (pkg.getReqType() == ReqType.TEST) { + // Do not wait in case of TEST as we do not actually send it out + sender.accept(pkg); + return -1; + } PackageQueuedNotifier queuedNotifier = pubQueueProvider.getQueuedNotifier(); try { CompletableFuture<Long> received = queuedNotifier.registerWait(pkg.getPkgId()); @@ -311,12 +300,4 @@ public class DistributionPublisher implements DistributionAgent { } } - @Nonnull - private DistributionResponse executeUnsupported(DistributionRequest request) { - String msg = format("Request requestType=%s not supported by this agent, expected one of %s", - request.getRequestType(), reqTypes.keySet()); - log.info(msg); - return new SimpleDistributionResponse(DistributionRequestState.DROPPED, msg); - } - }