This is an automated email from the ASF dual-hosted git repository. joerghoh pushed a commit to branch SLING-13021-2 in repository https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git
commit edfa958d9d9836a3103d73746ab122f4d17a3f1e Author: Joerg Hoh <[email protected]> AuthorDate: Tue Feb 10 14:48:03 2026 +0100 SLING-13021 implement offset handling for concurrent invocations --- .../journal/bookkeeper/BookKeeper.java | 55 +++++++- .../impl/subscriber/DistributionSubscriber.java | 4 +- .../journal/bookkeeper/BookKeeperTest.java | 144 ++++++++++++++++++++- 3 files changed, 191 insertions(+), 12 deletions(-) diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java index 0e5d14b..e65da7a 100644 --- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java +++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java @@ -30,8 +30,10 @@ import java.io.StringWriter; import java.time.Duration; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -48,6 +50,7 @@ import org.apache.sling.distribution.ImportPreProcessor; import org.apache.sling.distribution.InvalidationProcessException; import org.apache.sling.distribution.InvalidationProcessor; import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.impl.event.DistributionFailureEvent; import org.apache.sling.distribution.journal.messages.LogMessage; import org.apache.sling.distribution.journal.messages.PackageMessage; @@ -78,6 +81,14 @@ import org.slf4j.LoggerFactory; * The clustered and non clustered publish instances use * cases can be supported by only running the Subscriber * agent on the leader instance. + * + * + * The BookKeeper supports the concurrent handling of packages; for that it + * keeps track of messages which are submitted to importPackages() or + * skipPackages(), as both can take longer to being processed. These in-flight messages + * are stored in the messagesBeingProcessed set; an offsets is only persisted + * if this offset is the lowest offset of all messages in that set. + * */ public class BookKeeper { public static final String STORE_TYPE_STATUS = "statuses"; @@ -107,6 +118,9 @@ public class BookKeeper { private final ImportPostProcessor importPostProcessor; private final InvalidationProcessor invalidationProcessor; private int skippedCounter = 0; + + private Set<MessageInfo> messagesBeingProcessed = new HashSet<>(); + public BookKeeper(ResourceResolverFactory resolverFactory, SubscriberMetrics subscriberMetrics, PackageHandler packageHandler, EventAdmin eventAdmin, Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender, @@ -152,10 +166,12 @@ public class BookKeeper { * failing. For those packages importers, we aim at processing packages at least * once, thanks to the order in which the content updates are applied. */ - public void importPackage(PackageMessage pkgMsg, long offset, Date createdTime, Date importStartTime) throws DistributionException { + public void importPackage(PackageMessage pkgMsg, MessageInfo message, Date createdTime, Date importStartTime) throws DistributionException { + long offset = message.getOffset(); log.debug("Importing distribution package {} at offset={}", pkgMsg, offset); try (Timer.Context context = subscriberMetrics.getImportedPackageDuration().time(); ResourceResolver importerResolver = getServiceResolver(SUBSERVICE_IMPORTER)) { + recordMessageProcessingStart(message); // Execute the pre-processor preProcess(pkgMsg); subscriberMetrics.setCurrentImport(new CurrentImportInfo(pkgMsg, offset, importStartTime.getTime())); @@ -165,6 +181,7 @@ public class BookKeeper { } storeOffset(importerResolver, offset); importerResolver.commit(); + recordMessageProcessingCompleted(message); subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength()); subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - createdTime.getTime()), TimeUnit.MILLISECONDS); @@ -184,10 +201,20 @@ public class BookKeeper { subscriberMetrics.clearCurrentImport(); } } + + private synchronized void recordMessageProcessingStart(MessageInfo message) { + messagesBeingProcessed.add(message); + } + + private synchronized void recordMessageProcessingCompleted(MessageInfo message) { + messagesBeingProcessed.remove(message); + } - public void invalidateCache(PackageMessage pkgMsg, long offset, Date createdTime, Date importStartTime) throws DistributionException { + public void invalidateCache(PackageMessage pkgMsg, MessageInfo message, Date createdTime, Date importStartTime) throws DistributionException { + long offset = message.getOffset(); log.debug("Invalidating the cache for the package {} at offset={}", pkgMsg, offset); try (ResourceResolver resolver = getServiceResolver(SUBSERVICE_BOOKKEEPER)) { + recordMessageProcessingStart(message); Map<String, Object> props = this.buildProcessorPropertiesFromMessage(pkgMsg); long invalidationStartTime = currentTimeMillis(); @@ -201,6 +228,7 @@ public class BookKeeper { storeOffset(resolver, offset); resolver.commit(); + recordMessageProcessingCompleted(message); clearPackageRetriesOnSuccess(pkgMsg); @@ -446,8 +474,27 @@ public class BookKeeper { log.info("Stored status {}", statusMap); } - private void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException { - processedOffsets.store(resolver, KEY_OFFSET, offset); + /** + * Store the provided offset in the repository. This offset is only processed if it has the smallest + * offset of all entries of the messagesBeingImported set; that indicates that all messages with lower offsets + * have already been processed, and that it's safe now to mark this offset as the latest persisted one. + * @param offset the offset to persist + * @throws PersistenceException + */ + private synchronized void storeOffset(ResourceResolver resolver, long offset) throws PersistenceException { + long smallestOffset = Long.MAX_VALUE; + if (messagesBeingProcessed.isEmpty()) { + smallestOffset = offset; // we have to store the offset if no other message is being processed concurrently + } else { + for (MessageInfo mi: messagesBeingProcessed) { + if (mi.getOffset() < smallestOffset) { + smallestOffset = mi.getOffset(); + } + } + } + if (smallestOffset == offset) { + processedOffsets.store(resolver, KEY_OFFSET, offset); + } } private ResourceResolver getServiceResolver(String subService) throws LoginException { diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java index bda2a83..ea4479b 100644 --- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java +++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java @@ -332,9 +332,9 @@ public class DistributionSubscriber { if (skip) { bookKeeper.removePackage(pkgMsg, info.getOffset()); } else if (type == INVALIDATE) { - bookKeeper.invalidateCache(pkgMsg, info.getOffset(), createdTime, importStartTime); + bookKeeper.invalidateCache(pkgMsg, info, createdTime, importStartTime); } else { - bookKeeper.importPackage(pkgMsg, info.getOffset(), createdTime, importStartTime); + bookKeeper.importPackage(pkgMsg, info, createdTime, importStartTime); } blockingSendStoredStatus(); } finally { diff --git a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java index e297bc5..ed3b20b 100644 --- a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java +++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java @@ -26,11 +26,19 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; import java.time.Duration; +import java.util.Collections; import java.util.Date; +import java.util.Map; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.apache.sling.api.resource.LoginException; @@ -44,12 +52,14 @@ import org.apache.sling.distribution.ImportPostProcessor; import org.apache.sling.distribution.ImportPreProcessor; import org.apache.sling.distribution.InvalidationProcessor; import org.apache.sling.distribution.common.DistributionException; +import org.apache.sling.distribution.journal.MessageInfo; import org.apache.sling.distribution.journal.messages.LogMessage; import org.apache.sling.distribution.journal.messages.PackageMessage; import org.apache.sling.distribution.journal.messages.PackageStatusMessage; import org.apache.sling.distribution.packaging.DistributionPackageBuilder; import org.apache.sling.testing.mock.osgi.junit.OsgiContext; import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -139,7 +149,7 @@ public class BookKeeperTest { public void testPackageImport() throws DistributionException { try { Date createdTime = new Date(currentTimeMillis()); - bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, createdTime, createdTime); + bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), buildMessageInfo(10), createdTime, createdTime); } finally { assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0)); } @@ -155,7 +165,7 @@ public class BookKeeperTest { for (int c=0; c< BookKeeper.NUM_ERRORS_BLOCKING + 1; c++) { try { Date createdTime = new Date(currentTimeMillis()); - bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, createdTime, createdTime); + bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), buildMessageInfo(10), createdTime, createdTime); } catch (DistributionException e) { } } @@ -180,7 +190,7 @@ public class BookKeeperTest { }).when(packageHandler).apply(Mockito.any(ResourceResolver.class), Mockito.any(PackageMessage.class)); Date simulatedStartTime = new Date( currentTimeMillis() - Duration.ofMinutes(6).toMillis( )); - bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, simulatedStartTime, simulatedStartTime); + bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), buildMessageInfo(10), simulatedStartTime, simulatedStartTime); assertThat(subscriberMetrics.getCurrentImportDuration(), equalTo(0L)); } @@ -202,7 +212,7 @@ public class BookKeeperTest { }).when(packageHandler).apply(Mockito.any(ResourceResolver.class), Mockito.any(PackageMessage.class)); Date simulatedStartTime = new Date( currentTimeMillis() - Duration.ofMinutes(1).toMillis()); - bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, new Date(currentTimeMillis()), simulatedStartTime); + bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), buildMessageInfo(10), new Date(currentTimeMillis()), simulatedStartTime); assertThat(subscriberMetrics.getCurrentImportDuration(), equalTo(0L)); } @@ -211,7 +221,7 @@ public class BookKeeperTest { public void testCacheInvalidation() throws DistributionException { try { Date simulatedStartTime = new Date( currentTimeMillis() - Duration.ofMinutes(1).toMillis()); - bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE), 10L, simulatedStartTime, simulatedStartTime); + bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE), buildMessageInfo(10), simulatedStartTime, simulatedStartTime); } finally { assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0)); } @@ -227,7 +237,98 @@ public class BookKeeperTest { assertThat("Should be null", offset2, equalTo(newOffset)); } + /** + * Verifies that concurrent importPackage() calls only persist an offset when all messages + * with lower offsets have completed. Uses CountDownLatches to control packageHandler.apply() + * completion order without relying on timing, so the test is deterministic. + */ + @Test + public void testConcurrentImportStoresOffsetOnlyWhenAllLowerOffsetsCompleted() + throws InterruptedException, DistributionException { + final int count = 4; + final long timeoutSeconds = 2; + + // Latches to block each apply() until we release it (keyed by offset) + final CountDownLatch[] startLatches = new CountDownLatch[count]; + final CountDownLatch[] doneLatches = new CountDownLatch[count]; + for (int i = 0; i < count; i++) { + startLatches[i] = new CountDownLatch(1); + doneLatches[i] = new CountDownLatch(1); + } + final CountDownLatch allEnteredApply = new CountDownLatch(count); + + doAnswer(invocation -> { + PackageMessage pkgMsg = invocation.getArgument(1); + String pkgId = pkgMsg.getPkgId(); + int offset = Integer.parseInt(pkgId.replace("pkg-", "")); + allEnteredApply.countDown(); + startLatches[offset].await(); // only continue when the latch is counted down + return null; + }).when(packageHandler).apply(any(ResourceResolver.class), any(PackageMessage.class)); + + ExecutorService executor = Executors.newFixedThreadPool(count); + try { + for (int offset = 0; offset < count; offset++) { + final int off = offset; + executor.submit(() -> { + try { + bookKeeper.importPackage( + buildPackageMessage(PackageMessage.ReqType.ADD, (long) off), + buildMessageInfo(off), + new Date(), + new Date()); + } catch (DistributionException e) { + throw new AssertionError("importPackage failed for offset " + off, e); + } finally { + doneLatches[off].countDown(); + } + }); + } + + Assert.assertTrue("All threads should enter apply()", + allEnteredApply.await(timeoutSeconds, TimeUnit.SECONDS)); + + // No import has completed yet -> offset must still be -1 + assertThat(bookKeeper.loadOffset(), equalTo(-1L)); + + // Complete offset 2 first. Stored offset must not advance (0,1 still in flight). + startLatches[2].countDown(); + Assert.assertTrue("Offset 2 import should complete", + doneLatches[2].await(timeoutSeconds, TimeUnit.SECONDS)); + assertThat(bookKeeper.loadOffset(), equalTo(-1L)); + + // Complete offset 0. storeOffset(0) runs while set still has {0,1,3}; smallest=0 -> store 0. + startLatches[0].countDown(); + Assert.assertTrue("Offset 0 import should complete", + doneLatches[0].await(timeoutSeconds, TimeUnit.SECONDS)); + assertThat(bookKeeper.loadOffset(), equalTo(0L)); + + // Complete offset 3. Stored offset must stay 0 (offset 1 still in flight). + startLatches[3].countDown(); + Assert.assertTrue("Offset 3 import should complete", + doneLatches[3].await(timeoutSeconds, TimeUnit.SECONDS)); + assertThat(bookKeeper.loadOffset(), equalTo(0L)); + + // Complete offset 1. Now 1 is the only one left in set -> store 1. + startLatches[1].countDown(); + Assert.assertTrue("Offset 1 import should complete", + doneLatches[1].await(timeoutSeconds, TimeUnit.SECONDS)); + assertThat(bookKeeper.loadOffset(), equalTo(1L)); + } finally { + executor.shutdown(); + executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS); + } + } + PackageMessage buildPackageMessage(PackageMessage.ReqType reqType) { + return buildPackageMessage(reqType, null); + } + + /** + * Build a package message with an optional offset encoded in pkgId for tests that need to + * identify the message (e.g. "pkg-2" for offset 2). If offset is null, a random UUID is used. + */ + PackageMessage buildPackageMessage(PackageMessage.ReqType reqType, Long offsetForPkgId) { PackageMessage msg = mock(PackageMessage.class); when(msg.getPkgLength()) .thenReturn(100L); @@ -238,8 +339,39 @@ public class BookKeeperTest { when(msg.getPaths()) .thenReturn(singletonList("/content")); when(msg.getPkgId()) - .thenReturn(UUID.randomUUID().toString()); + .thenReturn(offsetForPkgId != null ? "pkg-" + offsetForPkgId : UUID.randomUUID().toString()); return msg; } + + MessageInfo buildMessageInfo(long offset) { + return new MessageInfo() { + + @Override + public String getTopic() { + return "testTopic"; + } + + @Override + public int getPartition() { + return 0; + } + + @Override + public long getOffset() { + return offset; + } + + @Override + public long getCreateTime() { + return System.currentTimeMillis(); + } + + @Override + public Map<String, String> getProps() { + return Collections.emptyMap(); + } + + }; + } }
