This is an automated email from the ASF dual-hosted git repository. joerghoh pushed a commit to branch SLING-13021-concurrent-import in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit bac3ed9621eafa513352b28bbf5f2c2ad6d0d13b Author: Joerg Hoh <[email protected]> AuthorDate: Mon Dec 1 19:14:19 2025 +0100 SLING-13201 delegate the import to an executor --- .../impl/subscriber/DistributionSubscriber.java | 86 ++++++++++++++++++---- .../impl/subscriber/SubscriberConfiguration.java | 3 + .../journal/impl/subscriber/SubscriberTest.java | 7 +- 3 files changed, 81 insertions(+), 15 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index bda2a83..b056490 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -37,8 +37,17 @@ import java.util.Collections; import java.util.Date; import java.util.Map; import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.CompletionService; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiConsumer; import java.util.function.Consumer; @@ -130,6 +139,10 @@ public class DistributionSubscriber { private volatile boolean running = true; private LongSupplier catchAllDelay = catchAllDelays.get(); + + private CompletionService<PackageMessageResult> completionService; + private ExecutorService importExecutor; + private AtomicLong totalNumberOfImportedMessages = new AtomicLong(0); private final Delay delay = new Delay(); private AtomicReference<DistributionAgentState> state = new AtomicReference<DistributionAgentState>(DistributionAgentState.IDLE); @@ -192,15 +205,32 @@ public class DistributionSubscriber { String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null; packagePoller = messagingProvider.createPoller(Topics.PACKAGE_TOPIC, Reset.latest, assign, - HandlerAdapter.create(PackageMessage.class, this::handlePackageMessage), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage)); + HandlerAdapter.create(PackageMessage.class, this::delegateMessageToExecutor), HandlerAdapter.create(OffsetMessage.class, this::handleOffsetMessage)); int announceDelay = Converters.standardConverter().convert(properties.get("announceDelay")).defaultValue(10000).to(Integer.class); announcer = new Announcer(subSlingId, subAgentName, queueNames, messagingProvider.createSender(Topics.DISCOVERY_TOPIC), bookKeeper, config.maxRetries(), config.editable(), announceDelay); - - LOG.info("Started Subscriber agent={} at offset={}, subscribed to agent names {}, readyCheck={}", subAgentName, startOffset, - queueNames, config.subscriberIdleCheck()); + + importExecutor = Executors.newFixedThreadPool(config.concurrentImportingThreads(), new ThreadFactory() { + + AtomicInteger id = new AtomicInteger(0); + + public Thread newThread(Runnable r) { + int no = id.incrementAndGet(); + Thread t = new Thread(r); + t.setName("DistributionSubscriber-importer-" + no); + return t; + } + }); + completionService = new ExecutorCompletionService<>(importExecutor); + + LOG.info("Started Subscriber agent={} at offset={}, subscribed to agent names {}, readyCheck={}, {} importing threads", + subAgentName, + startOffset, + queueNames, + config.subscriberIdleCheck(), + config.concurrentImportingThreads()); } private String getFirst(String[] agentNames) { @@ -235,6 +265,7 @@ public class DistributionSubscriber { IOUtils.closeQuietly(announcer, packagePoller, idleReadyCheck, idleCheck, commandPoller); running = false; + importExecutor.shutdown(); LOG.info("Stopped Subscriber agent {}, subscribed to Publisher agent names {} with package builder {}", subAgentName, queueNames, pkgType); } @@ -244,21 +275,33 @@ public class DistributionSubscriber { return (isBlocked) ? DistributionAgentState.BLOCKED : state.get(); } - private void handlePackageMessage(MessageInfo info, PackageMessage message) { - boolean done = false; - while (!done && running) { - done = tryProcess(info, message); + private void delegateMessageToExecutor (MessageInfo info, PackageMessage message) { + // TODO: do we need that completionService? It will store all submitted futures + // and we should take() them eventually ... + completionService.submit(() -> handlePackageMessage(info, message)); + totalNumberOfImportedMessages.incrementAndGet(); + } + + private PackageMessageResult handlePackageMessage(MessageInfo info, PackageMessage message) { + PackageMessageResult result = null; + boolean done = false; + while (!done && running) { + result = tryProcess(info,message); + done = result.success; } + return result; } - public boolean tryProcess(MessageInfo info, PackageMessage message) { + public PackageMessageResult tryProcess(MessageInfo info, PackageMessage message) { + PackageMessageResult result = new PackageMessageResult(); if (shouldSkip(info, message)) { try { bookKeeper.skipPackage(info.getOffset()); } catch (PersistenceException | LoginException e) { LOG.warn("Error marking distribution package {} at offset={} as skipped", message, info.getOffset(), e); } - return true; + result.success = true; + return result; } subscriberMetrics.getPackageJournalDistributionDuration() .update((currentTimeMillis() - info.getCreateTime()), TimeUnit.MILLISECONDS); @@ -269,7 +312,8 @@ public class DistributionSubscriber { // Precondition timed out. We only log this on info level as it is no error LOG.info(e.getMessage()); delay.await(RETRY_DELAY_MILLIS); - return false; + result.success = false; + return result; } catch (InterruptedException e) { Thread.currentThread().interrupt(); LOG.debug(e.getMessage()); @@ -277,11 +321,13 @@ public class DistributionSubscriber { // Catch all to prevent processing from stopping LOG.error("Error processing queue item", e); delay.await(catchAllDelay.getAsLong()); - return false; + result.success = false; + return result; } finally { announcer.run(); } - return true; + result.success = true; + return result; } private void handleOffsetMessage(MessageInfo info, OffsetMessage message) { @@ -372,4 +418,18 @@ public class DistributionSubscriber { throw new PreConditionTimeoutException(msg); } + // This is just for testing -- wait that all async messages have been imported + public void waitForAllMessagesBeingImported() throws InterruptedException { + for (int i=0; i < totalNumberOfImportedMessages.get();i++) { + completionService.take(); + } + } + + + class PackageMessageResult { + + public boolean success; + + } + } diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java index bcf2e0b..d225948 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java @@ -70,4 +70,7 @@ public @interface SubscriberConfiguration { @AttributeDefinition(description = "Number of ms to wait before retrying to process a package.") int acceptableAgeDiffMs() default 120 * 1000; + + @AttributeDefinition(description = "Number of threads importing content concurrently") + int concurrentImportingThreads() default 1; } diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java index 1bdbe19..88ade05 100644 --- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java @@ -238,7 +238,7 @@ public class SubscriberTest { } @Test - public void testReceiveNotSubscribed() throws DistributionException { + public void testReceiveNotSubscribed() throws DistributionException, InterruptedException { assumeNoPrecondition(); initSubscriber(Collections.singletonMap("agentNames", "dummy")); assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE)); @@ -253,6 +253,7 @@ public class SubscriberTest { for (int c=0; c < BookKeeper.COMMIT_AFTER_NUM_SKIPPED; c++) { packageHandler.handle(info, message); } + subscriber.waitForAllMessagesBeingImported(); assertThat(getStoredOffset(), equalTo(100l)); } @@ -271,7 +272,7 @@ public class SubscriberTest { } @Test - public void testImportPreAndPostProcessInvoked() throws DistributionException, ImportPostProcessException, ImportPreProcessException { + public void testImportPreAndPostProcessInvoked() throws DistributionException, ImportPostProcessException, ImportPreProcessException, InterruptedException { assumeNoPrecondition(); initSubscriber(); assertThat(subscriber.getState(), equalTo(DistributionAgentState.IDLE)); @@ -287,6 +288,8 @@ public class SubscriberTest { props.put(DISTRIBUTION_PACKAGE_ID, message.getPkgId()); props.put(DISTRIBUTION_COMPONENT_NAME, message.getPubAgentName()); + subscriber.waitForAllMessagesBeingImported(); + verify(importPreProcessor, times(1)).process(props); verify(importPostProcessor, times(1)).process(props); }
