This is an automated email from the ASF dual-hosted git repository.

joerghoh pushed a commit to branch SLING-13021-2
in repository 
https://gitbox.apache.org/repos/asf/sling-org-apache-sling-distribution-journal.git

commit edfa958d9d9836a3103d73746ab122f4d17a3f1e
Author: Joerg Hoh <[email protected]>
AuthorDate: Tue Feb 10 14:48:03 2026 +0100

    SLING-13021 implement offset handling for concurrent invocations
---
 .../journal/bookkeeper/BookKeeper.java             |  55 +++++++-
 .../impl/subscriber/DistributionSubscriber.java    |   4 +-
 .../journal/bookkeeper/BookKeeperTest.java         | 144 ++++++++++++++++++++-
 3 files changed, 191 insertions(+), 12 deletions(-)

diff --git 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 0e5d14b..e65da7a 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -30,8 +30,10 @@ import java.io.StringWriter;
 import java.time.Duration;
 import java.util.Date;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
@@ -48,6 +50,7 @@ import org.apache.sling.distribution.ImportPreProcessor;
 import org.apache.sling.distribution.InvalidationProcessException;
 import org.apache.sling.distribution.InvalidationProcessor;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.MessageInfo;
 import 
org.apache.sling.distribution.journal.impl.event.DistributionFailureEvent;
 import org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
@@ -78,6 +81,14 @@ import org.slf4j.LoggerFactory;
  * The clustered and non clustered publish instances use
  * cases can be supported by only running the Subscriber
  * agent on the leader instance.
+ * 
+ * 
+ * The BookKeeper supports the concurrent handling of packages; for that it 
+ * keeps track of messages which are submitted to importPackages() or
+ * skipPackages(), as both can take longer to being processed. These in-flight 
messages
+ * are stored in the messagesBeingProcessed set; an offsets is only persisted
+ * if this offset is the lowest offset of all messages in that set.
+ * 
  */
 public class BookKeeper {
     public static final String STORE_TYPE_STATUS = "statuses";
@@ -107,6 +118,9 @@ public class BookKeeper {
     private final ImportPostProcessor importPostProcessor;
     private final InvalidationProcessor invalidationProcessor;
     private int skippedCounter = 0;
+    
+    private Set<MessageInfo> messagesBeingProcessed = new HashSet<>();
+    
 
     public BookKeeper(ResourceResolverFactory resolverFactory, 
SubscriberMetrics subscriberMetrics,
         PackageHandler packageHandler, EventAdmin eventAdmin, 
Consumer<PackageStatusMessage> sender, Consumer<LogMessage> logSender,
@@ -152,10 +166,12 @@ public class BookKeeper {
      * failing. For those packages importers, we aim at processing packages at 
least
      * once, thanks to the order in which the content updates are applied.
      */
-    public void importPackage(PackageMessage pkgMsg, long offset, Date 
createdTime, Date importStartTime) throws DistributionException {
+    public void importPackage(PackageMessage pkgMsg, MessageInfo message, Date 
createdTime, Date importStartTime) throws DistributionException {
+        long offset = message.getOffset();
         log.debug("Importing distribution package {} at offset={}", pkgMsg, 
offset);
         try (Timer.Context context = 
subscriberMetrics.getImportedPackageDuration().time();
                 ResourceResolver importerResolver = 
getServiceResolver(SUBSERVICE_IMPORTER)) {
+            recordMessageProcessingStart(message);
             // Execute the pre-processor
             preProcess(pkgMsg);
             subscriberMetrics.setCurrentImport(new CurrentImportInfo(pkgMsg, 
offset, importStartTime.getTime()));
@@ -165,6 +181,7 @@ public class BookKeeper {
             }
             storeOffset(importerResolver, offset);
             importerResolver.commit();
+            recordMessageProcessingCompleted(message);
             
subscriberMetrics.getImportedPackageSize().update(pkgMsg.getPkgLength());
             
subscriberMetrics.getPackageDistributedDuration().update((currentTimeMillis() - 
createdTime.getTime()), TimeUnit.MILLISECONDS);
             
@@ -184,10 +201,20 @@ public class BookKeeper {
             subscriberMetrics.clearCurrentImport();
         }
     }
+    
+    private synchronized void recordMessageProcessingStart(MessageInfo 
message) {
+        messagesBeingProcessed.add(message);
+    }
+    
+    private synchronized void recordMessageProcessingCompleted(MessageInfo 
message) {
+        messagesBeingProcessed.remove(message);
+    }
 
-    public void invalidateCache(PackageMessage pkgMsg, long offset, Date 
createdTime, Date importStartTime) throws DistributionException {
+    public void invalidateCache(PackageMessage pkgMsg, MessageInfo message, 
Date createdTime, Date importStartTime) throws DistributionException {
+        long offset = message.getOffset();
         log.debug("Invalidating the cache for the package {} at offset={}", 
pkgMsg, offset);
         try (ResourceResolver resolver = 
getServiceResolver(SUBSERVICE_BOOKKEEPER)) {
+            recordMessageProcessingStart(message);
             Map<String, Object> props = 
this.buildProcessorPropertiesFromMessage(pkgMsg);
 
             long invalidationStartTime = currentTimeMillis();
@@ -201,6 +228,7 @@ public class BookKeeper {
 
             storeOffset(resolver, offset);
             resolver.commit();
+            recordMessageProcessingCompleted(message);
 
             clearPackageRetriesOnSuccess(pkgMsg);
 
@@ -446,8 +474,27 @@ public class BookKeeper {
         log.info("Stored status {}", statusMap);
     }
 
-    private void storeOffset(ResourceResolver resolver, long offset) throws 
PersistenceException {
-        processedOffsets.store(resolver, KEY_OFFSET, offset);
+    /**
+     * Store the provided offset in the repository. This offset is only 
processed if it has the smallest
+     * offset of all entries of the messagesBeingImported set; that indicates 
that all messages with lower offsets
+     * have already been processed, and that it's safe now to mark this offset 
as the latest persisted one.
+     * @param offset the offset to persist
+     * @throws PersistenceException
+     */
+    private synchronized void storeOffset(ResourceResolver resolver, long 
offset) throws PersistenceException {
+        long smallestOffset = Long.MAX_VALUE;
+        if (messagesBeingProcessed.isEmpty()) {
+            smallestOffset = offset; // we have to store the offset if no 
other message is being processed concurrently
+        } else {
+            for (MessageInfo mi: messagesBeingProcessed) {
+                if (mi.getOffset() < smallestOffset) {
+                    smallestOffset = mi.getOffset();
+                }
+            }
+        }
+        if (smallestOffset == offset) {
+            processedOffsets.store(resolver, KEY_OFFSET, offset);
+        }
     }
 
     private ResourceResolver getServiceResolver(String subService) throws 
LoginException {
diff --git 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index bda2a83..ea4479b 100644
--- 
a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ 
b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -332,9 +332,9 @@ public class DistributionSubscriber {
             if (skip) {
                 bookKeeper.removePackage(pkgMsg, info.getOffset());
             } else if (type == INVALIDATE) {
-                bookKeeper.invalidateCache(pkgMsg, info.getOffset(), 
createdTime, importStartTime);
+                bookKeeper.invalidateCache(pkgMsg, info, createdTime, 
importStartTime);
             } else {
-                bookKeeper.importPackage(pkgMsg, info.getOffset(), 
createdTime, importStartTime);
+                bookKeeper.importPackage(pkgMsg, info, createdTime, 
importStartTime);
             }
             blockingSendStoredStatus();
         } finally {
diff --git 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index e297bc5..ed3b20b 100644
--- 
a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ 
b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -26,11 +26,19 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.when;
+import static org.mockito.ArgumentMatchers.any;
 
 import java.time.Duration;
+import java.util.Collections;
 import java.util.Date;
+import java.util.Map;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 
 import org.apache.sling.api.resource.LoginException;
@@ -44,12 +52,14 @@ import org.apache.sling.distribution.ImportPostProcessor;
 import org.apache.sling.distribution.ImportPreProcessor;
 import org.apache.sling.distribution.InvalidationProcessor;
 import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.messages.LogMessage;
 import org.apache.sling.distribution.journal.messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.testing.mock.osgi.junit.OsgiContext;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -139,7 +149,7 @@ public class BookKeeperTest {
     public void testPackageImport() throws DistributionException {
         try {
             Date createdTime = new Date(currentTimeMillis());
-                       
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
createdTime, createdTime);
+                       
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 
buildMessageInfo(10), createdTime, createdTime);
         } finally {
             assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0));
         }
@@ -155,7 +165,7 @@ public class BookKeeperTest {
         for (int c=0; c< BookKeeper.NUM_ERRORS_BLOCKING + 1; c++) {
             try {
                 Date createdTime = new Date(currentTimeMillis());
-                
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
createdTime, createdTime);
+                
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 
buildMessageInfo(10), createdTime, createdTime);
             } catch (DistributionException e) {
             }
         }
@@ -180,7 +190,7 @@ public class BookKeeperTest {
         }).when(packageHandler).apply(Mockito.any(ResourceResolver.class), 
Mockito.any(PackageMessage.class));
         
         Date simulatedStartTime = new Date( currentTimeMillis() - 
Duration.ofMinutes(6).toMillis( ));
-        
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
simulatedStartTime, simulatedStartTime);
+        
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 
buildMessageInfo(10), simulatedStartTime, simulatedStartTime);
         
         assertThat(subscriberMetrics.getCurrentImportDuration(), equalTo(0L));
     }
@@ -202,7 +212,7 @@ public class BookKeeperTest {
         }).when(packageHandler).apply(Mockito.any(ResourceResolver.class), 
Mockito.any(PackageMessage.class));
         
         Date simulatedStartTime = new Date( currentTimeMillis() - 
Duration.ofMinutes(1).toMillis());
-        
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 10, 
new Date(currentTimeMillis()), simulatedStartTime);
+        
bookKeeper.importPackage(buildPackageMessage(PackageMessage.ReqType.ADD), 
buildMessageInfo(10), new Date(currentTimeMillis()), simulatedStartTime);
         
         assertThat(subscriberMetrics.getCurrentImportDuration(), equalTo(0L));
     }
@@ -211,7 +221,7 @@ public class BookKeeperTest {
     public void testCacheInvalidation() throws DistributionException {
         try {
                Date simulatedStartTime = new Date( currentTimeMillis() - 
Duration.ofMinutes(1).toMillis());
-            
bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE),
 10L, simulatedStartTime, simulatedStartTime);
+            
bookKeeper.invalidateCache(buildPackageMessage(PackageMessage.ReqType.INVALIDATE),
 buildMessageInfo(10), simulatedStartTime, simulatedStartTime);
         } finally {
             assertThat(bookKeeper.getRetries(PUB_AGENT_NAME), equalTo(0));
         }
@@ -227,7 +237,98 @@ public class BookKeeperTest {
        assertThat("Should be null", offset2, equalTo(newOffset));
     }
 
+    /**
+     * Verifies that concurrent importPackage() calls only persist an offset 
when all messages
+     * with lower offsets have completed. Uses CountDownLatches to control 
packageHandler.apply()
+     * completion order without relying on timing, so the test is 
deterministic.
+     */
+    @Test
+    public void 
testConcurrentImportStoresOffsetOnlyWhenAllLowerOffsetsCompleted()
+            throws InterruptedException, DistributionException {
+        final int count = 4;
+        final long timeoutSeconds = 2;
+
+        // Latches to block each apply() until we release it (keyed by offset)
+        final CountDownLatch[] startLatches = new CountDownLatch[count];
+        final CountDownLatch[] doneLatches = new CountDownLatch[count];
+        for (int i = 0; i < count; i++) {
+            startLatches[i] = new CountDownLatch(1);
+            doneLatches[i] = new CountDownLatch(1);
+        }
+        final CountDownLatch allEnteredApply = new CountDownLatch(count);
+
+        doAnswer(invocation -> {
+            PackageMessage pkgMsg = invocation.getArgument(1);
+            String pkgId = pkgMsg.getPkgId();
+            int offset = Integer.parseInt(pkgId.replace("pkg-", ""));
+            allEnteredApply.countDown();
+            startLatches[offset].await(); // only continue when the latch is 
counted down
+            return null;
+        }).when(packageHandler).apply(any(ResourceResolver.class), 
any(PackageMessage.class));
+
+        ExecutorService executor = Executors.newFixedThreadPool(count);
+        try {
+            for (int offset = 0; offset < count; offset++) {
+                final int off = offset;
+                executor.submit(() -> {
+                    try {
+                        bookKeeper.importPackage(
+                                
buildPackageMessage(PackageMessage.ReqType.ADD, (long) off),
+                                buildMessageInfo(off),
+                                new Date(),
+                                new Date());
+                    } catch (DistributionException e) {
+                        throw new AssertionError("importPackage failed for 
offset " + off, e);
+                    } finally {
+                        doneLatches[off].countDown();
+                    }
+                });
+            }
+
+            Assert.assertTrue("All threads should enter apply()",
+                    allEnteredApply.await(timeoutSeconds, TimeUnit.SECONDS));
+
+            // No import has completed yet -> offset must still be -1
+            assertThat(bookKeeper.loadOffset(), equalTo(-1L));
+
+            // Complete offset 2 first. Stored offset must not advance (0,1 
still in flight).
+            startLatches[2].countDown();
+            Assert.assertTrue("Offset 2 import should complete",
+                    doneLatches[2].await(timeoutSeconds, TimeUnit.SECONDS));
+            assertThat(bookKeeper.loadOffset(), equalTo(-1L));
+
+            // Complete offset 0. storeOffset(0) runs while set still has 
{0,1,3}; smallest=0 -> store 0.
+            startLatches[0].countDown();
+            Assert.assertTrue("Offset 0 import should complete",
+                    doneLatches[0].await(timeoutSeconds, TimeUnit.SECONDS));
+            assertThat(bookKeeper.loadOffset(), equalTo(0L));
+
+            // Complete offset 3. Stored offset must stay 0 (offset 1 still in 
flight).
+            startLatches[3].countDown();
+            Assert.assertTrue("Offset 3 import should complete",
+                    doneLatches[3].await(timeoutSeconds, TimeUnit.SECONDS));
+            assertThat(bookKeeper.loadOffset(), equalTo(0L));
+
+            // Complete offset 1. Now 1 is the only one left in set -> store 1.
+            startLatches[1].countDown();
+            Assert.assertTrue("Offset 1 import should complete",
+                    doneLatches[1].await(timeoutSeconds, TimeUnit.SECONDS));
+            assertThat(bookKeeper.loadOffset(), equalTo(1L));
+        } finally {
+            executor.shutdown();
+            executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS);
+        }
+    }
+
     PackageMessage buildPackageMessage(PackageMessage.ReqType reqType) {
+        return buildPackageMessage(reqType, null);
+    }
+
+    /**
+     * Build a package message with an optional offset encoded in pkgId for 
tests that need to
+     * identify the message (e.g. "pkg-2" for offset 2). If offset is null, a 
random UUID is used.
+     */
+    PackageMessage buildPackageMessage(PackageMessage.ReqType reqType, Long 
offsetForPkgId) {
         PackageMessage msg = mock(PackageMessage.class);
         when(msg.getPkgLength())
                 .thenReturn(100L);
@@ -238,8 +339,39 @@ public class BookKeeperTest {
         when(msg.getPaths())
                 .thenReturn(singletonList("/content"));
         when(msg.getPkgId())
-                .thenReturn(UUID.randomUUID().toString());
+                .thenReturn(offsetForPkgId != null ? "pkg-" + offsetForPkgId : 
UUID.randomUUID().toString());
         return msg;
     }
+    
+    MessageInfo buildMessageInfo(long offset) {
+        return new MessageInfo() {
+
+            @Override
+            public String getTopic() {
+                return "testTopic";
+            }
+
+            @Override
+            public int getPartition() {
+                return 0;
+            }
+
+            @Override
+            public long getOffset() {
+                return offset;
+            }
+
+            @Override
+            public long getCreateTime() {
+                return System.currentTimeMillis();
+            }
+
+            @Override
+            public Map<String, String> getProps() {
+                return Collections.emptyMap();
+            }
+            
+        };
+    }
 
 }

Reply via email to