This is an automated email from the ASF dual-hosted git repository.

joerghoh pushed a commit to branch SLING-13021-2
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit 5c02d0bcb1aaebc24e5ef2f7c7e1d617339bf35c
Author: Joerg Hoh <[email protected]>
AuthorDate: Tue Feb 10 16:49:07 2026 +0100

    SLING-13021 convert to import async in an executor
---
 .../impl/subscriber/DistributionSubscriber.java    | 44 ++++++++++++++++++++--
 .../impl/subscriber/SubscriberConfiguration.java   |  3 ++
 2 files changed, 44 insertions(+), 3 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index ea4479b..f678714 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -37,8 +37,12 @@ import java.util.Collections;
 import java.util.Date;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.BiConsumer;
 import java.util.function.Consumer;
@@ -134,6 +138,8 @@ public class DistributionSubscriber {
     private final Delay delay = new Delay();
        private AtomicReference<DistributionAgentState> state = new 
AtomicReference<DistributionAgentState>(DistributionAgentState.IDLE);
        
+       private ExecutorService importExecutor;
+       
        @Activate
        public DistributionSubscriber(
                        @Reference(name = "packageBuilder") 
DistributionPackageBuilder packageBuilder,
@@ -192,15 +198,29 @@ public class DistributionSubscriber {
         String assign = startOffset > 0 ? 
messagingProvider.assignTo(startOffset) : null;
 
         packagePoller = messagingProvider.createPoller(Topics.PACKAGE_TOPIC, 
Reset.latest, assign,
-                HandlerAdapter.create(PackageMessage.class, 
this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, 
this::handleOffsetMessage));
+                HandlerAdapter.create(PackageMessage.class, 
this::delegatePackageMessageToExecutor), 
HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage));
 
         int announceDelay = 
Converters.standardConverter().convert(properties.get("announceDelay")).defaultValue(10000).to(Integer.class);
         announcer = new Announcer(subSlingId, subAgentName, queueNames,
                 messagingProvider.createSender(Topics.DISCOVERY_TOPIC), 
bookKeeper,
                 config.maxRetries(), config.editable(), announceDelay);
+        
+        importExecutor = 
Executors.newFixedThreadPool(config.concurrentImporterThreads(), new 
ThreadFactory() {
 
-        LOG.info("Started Subscriber agent={} at offset={}, subscribed to 
agent names {}, readyCheck={}", subAgentName, startOffset,
-                queueNames, config.subscriberIdleCheck());
+            AtomicInteger id = new AtomicInteger(0);
+
+            @Override
+            public Thread newThread(Runnable r) {
+                int no = id.incrementAndGet();
+                Thread t = new Thread(r);
+                t.setName("DistributionSubscriber-importer-" + no);
+                return t;
+            }
+        });
+
+        LOG.info("Started Subscriber agent={} at offset={}, subscribed to 
agent names {}, readyCheck={}, concurrent importer threads={}", 
+                subAgentName, startOffset, queueNames, 
config.subscriberIdleCheck(),
+                config.concurrentImporterThreads());
     }
 
     private String getFirst(String[] agentNames) {
@@ -235,6 +255,17 @@ public class DistributionSubscriber {
 
         IOUtils.closeQuietly(announcer, packagePoller, idleReadyCheck, 
idleCheck, commandPoller);
         running = false;
+        if (importExecutor != null) {
+            importExecutor.shutdown();
+            try {
+                if (!importExecutor.awaitTermination(1, MINUTES)) {
+                    importExecutor.shutdownNow();
+                }
+            } catch (InterruptedException e) {
+                LOG.error("Not handling an InterruptedException during 
shutdown", e);
+                importExecutor.shutdownNow();
+            }
+        }
         LOG.info("Stopped Subscriber agent {}, subscribed to Publisher agent 
names {} with package builder {}",
                 subAgentName, queueNames, pkgType);
     }
@@ -244,6 +275,13 @@ public class DistributionSubscriber {
         return (isBlocked) ? DistributionAgentState.BLOCKED : state.get();
     }
 
+    /**
+     * Delegates the rest of the execution into a thread of the executor for 
async execution
+     */
+    private void delegatePackageMessageToExecutor(MessageInfo info, 
PackageMessage message) {
+        importExecutor.submit(() -> handlePackageMessage(info, message));
+    }
+
     private void handlePackageMessage(MessageInfo info, PackageMessage 
message) {
        boolean done = false;
        while (!done && running) {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index bcf2e0b..4940798 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -70,4 +70,7 @@ public @interface SubscriberConfiguration {
 
     @AttributeDefinition(description = "Number of ms to wait before retrying 
to process a package.")
     int acceptableAgeDiffMs() default 120 * 1000;
+    
+    @AttributeDefinition(description = "Number of threads importing packages 
(default 1)")
+    int concurrentImporterThreads() default 1;
 }

Reply via email to