This is an automated email from the ASF dual-hosted git repository.

cschneider pushed a commit to branch GRANITE-39892
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit d28b90c1bb3a26505bc3903e6e1f623d98e18b41
Author: Christian Schneider <cschn...@adobe.com>
AuthorDate: Thu Nov 30 14:30:30 2023 +0100

    GRANITE-39892 - Simplify DistributionPublisher
---
 .../journal/impl/publisher/AgentState.java         |  2 +
 .../impl/publisher/DistributionPublisher.java      | 63 ++++++++--------------
 2 files changed, 24 insertions(+), 41 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java
index b0cd53e..644db32 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/AgentState.java
@@ -22,6 +22,7 @@ import java.util.Objects;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
 import org.apache.sling.distribution.agent.DistributionAgentState;
@@ -40,6 +41,7 @@ public class AgentState {
     private AgentState() {
     }
 
+    @Nonnull
     public static DistributionAgentState getState(DistributionAgent agent) {
         boolean empty = 
queueStatuses(agent).noneMatch(AgentState::queueNotEmpty);
         if (empty) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 34f5851..a81a210 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -31,17 +31,16 @@ import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
-import java.util.function.ToLongFunction;
 
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 import javax.management.NotCompliantMBeanException;
 
 import org.apache.commons.io.IOUtils;
-import org.apache.sling.distribution.DistributionResponseInfo;
 import org.apache.sling.distribution.journal.impl.discovery.DiscoveryService;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
+import org.apache.sling.distribution.journal.messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.journal.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.shared.DefaultDistributionLog;
@@ -52,7 +51,6 @@ import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.DistributionRequestState;
-import org.apache.sling.distribution.DistributionRequestType;
 import org.apache.sling.distribution.DistributionResponse;
 import org.apache.sling.distribution.agent.DistributionAgentState;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
@@ -73,7 +71,6 @@ import org.osgi.service.metatype.annotations.Designate;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
 import org.apache.sling.distribution.journal.HandlerAdapter;
-import org.apache.sling.distribution.journal.JournalAvailable;
 
 /**
  * A Publisher SCD agent which produces messages to be consumed by a {@code 
DistributionSubscriber} agent.
@@ -89,8 +86,7 @@ public class DistributionPublisher implements 
DistributionAgent {
 
     public static final String FACTORY_PID = 
"org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory";
 
-    private final EnumMap<DistributionRequestType, 
ToLongFunction<PackageMessage>> reqTypes = new 
EnumMap<>(DistributionRequestType.class);
-
+    @Nonnull
     private final DefaultDistributionLog log;
 
     @Reference
@@ -136,10 +132,6 @@ public class DistributionPublisher implements 
DistributionAgent {
 
     public DistributionPublisher() {
         log = new DefaultDistributionLog(pubAgentName, this.getClass(), 
DefaultDistributionLog.LogLevel.INFO);
-        reqTypes.put(ADD,        this::sendAndWait);
-        reqTypes.put(DELETE,     this::sendAndWait);
-        reqTypes.put(INVALIDATE, this::sendAndWait);
-        reqTypes.put(TEST,       this::send);
     }
 
     @Activate
@@ -205,6 +197,7 @@ public class DistributionPublisher implements 
DistributionAgent {
     /**
      * Get queue names for alive subscribed subscriber agents.
      */
+    @SuppressWarnings("null")
     @Nonnull
     @Override
     public Iterable<String> getQueueNames() {
@@ -242,24 +235,22 @@ public class DistributionPublisher implements 
DistributionAgent {
     public DistributionResponse execute(ResourceResolver resourceResolver,
                                         DistributionRequest request)
             throws DistributionException {
-        ToLongFunction<PackageMessage> handler = 
reqTypes.get(request.getRequestType());
-        if (handler != null) {
-            return execute(resourceResolver, request, handler);
-        } else {
-            return executeUnsupported(request);
+        if (request.getRequestType() == PULL) {
+            String msg = "Request requestType=PULL not supported by this 
agent";
+            log.info(msg);
+            return new 
SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
         }
+        final PackageMessage pkg = buildPackage(resourceResolver, request);
+        return sendPackageMessage(pkg);
     }
 
-    private DistributionResponse execute(ResourceResolver resourceResolver,
-                                         DistributionRequest request,
-                                         ToLongFunction<PackageMessage> sender)
+    private PackageMessage buildPackage(ResourceResolver resourceResolver, 
DistributionRequest request)
             throws DistributionException {
-        final PackageMessage pkg;
         try {
             if (request.getRequestType() != TEST && request.getPaths().length 
== 0) {
                 throw new DistributionException("Empty paths are not allowed");
             }
-            pkg = timed(distributionMetricsService.getBuildPackageDuration(), 
() -> factory.create(packageBuilder, resourceResolver, pubAgentName, request));
+            return timed(distributionMetricsService.getBuildPackageDuration(), 
() -> factory.create(packageBuilder, resourceResolver, pubAgentName, request));
         } catch (Exception e) {
             distributionMetricsService.getDroppedRequests().mark();
             String msg = format("Failed to create content package for 
requestType=%s, paths=%s. Error=%s",
@@ -267,19 +258,17 @@ public class DistributionPublisher implements 
DistributionAgent {
             log.error(msg, e);
             throw new DistributionException(msg, e);
         }
-
+    }
+    
+    @Nonnull
+    private DistributionResponse sendPackageMessage(final PackageMessage pkg) 
throws DistributionException {
         try {
-            long offset = 
timed(distributionMetricsService.getEnqueuePackageDuration(), () -> 
sender.applyAsLong(pkg));
+            long offset = 
timed(distributionMetricsService.getEnqueuePackageDuration(), () -> 
this.sendAndWait(pkg));
             
distributionMetricsService.getExportedPackageSize().update(pkg.getPkgLength());
             distributionMetricsService.getAcceptedRequests().mark();
             String msg = format("Request accepted with distribution package %s 
at offset=%s", pkg, offset);
             log.info(msg);
-            return new SimpleDistributionResponse(ACCEPTED, msg, new 
DistributionResponseInfo() {
-                @Nonnull @Override 
-                public String getId() {
-                    return pkg.getPkgId();
-                }
-            });
+            return new SimpleDistributionResponse(ACCEPTED, msg, 
pkg::getPkgId);
         } catch (Throwable e) {
             distributionMetricsService.getDroppedRequests().mark();
             String msg = format("Failed to append distribution package %s to 
the journal", pkg);
@@ -291,13 +280,13 @@ public class DistributionPublisher implements 
DistributionAgent {
             }
         }
     }
-    
-    private long send(PackageMessage pkg) {
-        sender.accept(pkg);
-        return -1;
-    }
 
     private long sendAndWait(PackageMessage pkg) {
+        if (pkg.getReqType() == ReqType.TEST) {
+            // Do not wait in case of TEST as we do not actually send it out
+            sender.accept(pkg);
+            return -1;
+        }
         PackageQueuedNotifier queuedNotifier = 
pubQueueProvider.getQueuedNotifier();
         try {
             CompletableFuture<Long> received = 
queuedNotifier.registerWait(pkg.getPkgId());
@@ -311,12 +300,4 @@ public class DistributionPublisher implements 
DistributionAgent {
         }
     }
 
-    @Nonnull
-    private DistributionResponse executeUnsupported(DistributionRequest 
request) {
-        String msg = format("Request requestType=%s not supported by this 
agent, expected one of %s",
-                request.getRequestType(), reqTypes.keySet());
-        log.info(msg);
-        return new 
SimpleDistributionResponse(DistributionRequestState.DROPPED, msg);
-    }
-
 }

Reply via email to