This is an automated email from the ASF dual-hosted git repository. joerghoh pushed a commit to branch SLING-13021-2 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit 5c02d0bcb1aaebc24e5ef2f7c7e1d617339bf35c Author: Joerg Hoh <[email protected]> AuthorDate: Tue Feb 10 16:49:07 2026 +0100 SLING-13021 convert to import async in an executor --- .../impl/subscriber/DistributionSubscriber.java | 44 ++++++++++++++++++++-- .../impl/subscriber/SubscriberConfiguration.java | 3 ++ 2 files changed, 44 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index ea4479b..f678714 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -37,8 +37,12 @@ import java.util.Collections; import java.util.Date; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -134,6 +138,8 @@ public class DistributionSubscriber { private final Delay delay = new Delay(); private AtomicReference<DistributionAgentState> state = new AtomicReference<DistributionAgentState>(DistributionAgentState.IDLE); + private ExecutorService importExecutor; + @Activate public DistributionSubscriber( @Reference(name = "packageBuilder") DistributionPackageBuilder packageBuilder, @@ -192,15 +198,29 @@ public class DistributionSubscriber { String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null; packagePoller = messagingProvider.createPoller(Topics.PACKAGE_TOPIC, Reset.latest, assign, - HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage)); + HandlerAdapter.create(PackageMessage.class, this::delegatePackageMessageToExecutor), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage)); int announceDelay = Converters.standardConverter().convert(properties.get("announceDelay")).defaultValue(10000).to(Integer.class); announcer = new Announcer(subSlingId, subAgentName, queueNames, messagingProvider.createSender(Topics.DISCOVERY_TOPIC), bookKeeper, config.maxRetries(), config.editable(), announceDelay); + + importExecutor = Executors.newFixedThreadPool(config.concurrentImporterThreads(), new ThreadFactory() { - LOG.info("Started Subscriber agent={} at offset={}, subscribed to agent names {}, readyCheck={}", subAgentName, startOffset, - queueNames, config.subscriberIdleCheck()); + AtomicInteger id = new AtomicInteger(0); + + @Override + public Thread newThread(Runnable r) { + int no = id.incrementAndGet(); + Thread t = new Thread(r); + t.setName("DistributionSubscriber-importer-" + no); + return t; + } + }); + + LOG.info("Started Subscriber agent={} at offset={}, subscribed to agent names {}, readyCheck={}, concurrent importer threads={}", + subAgentName, startOffset, queueNames, config.subscriberIdleCheck(), + config.concurrentImporterThreads()); } private String getFirst(String[] agentNames) { @@ -235,6 +255,17 @@ public class DistributionSubscriber { IOUtils.closeQuietly(announcer, packagePoller, idleReadyCheck, idleCheck, commandPoller); running = false; + if (importExecutor != null) { + importExecutor.shutdown(); + try { + if (!importExecutor.awaitTermination(1, MINUTES)) { + importExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + LOG.error("Not handling an InterruptedException during shutdown", e); + importExecutor.shutdownNow(); + } + } LOG.info("Stopped Subscriber agent {}, subscribed to Publisher agent names {} with package builder {}", subAgentName, queueNames, pkgType); } @@ -244,6 +275,13 @@ public class DistributionSubscriber { return (isBlocked) ? DistributionAgentState.BLOCKED : state.get(); } + /** + * Delegates the rest of the execution into a thread of the executor for async execution + */ + private void delegatePackageMessageToExecutor(MessageInfo info, PackageMessage message) { + importExecutor.submit(() -> handlePackageMessage(info, message)); + } + private void handlePackageMessage(MessageInfo info, PackageMessage message) { boolean done = false; while (!done && running) { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java index bcf2e0b..4940798 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java @@ -70,4 +70,7 @@ public @interface SubscriberConfiguration { @AttributeDefinition(description = "Number of ms to wait before retrying to process a package.") int acceptableAgeDiffMs() default 120 * 1000; + + @AttributeDefinition(description = "Number of threads importing packages (default 1)") + int concurrentImporterThreads() default 1; }
