This is an automated email from the ASF dual-hosted git repository.

daim pushed a commit to branch OAK-12129
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 003532cdd6e89b7eb3aa04e6cce26ec862c9265e
Author: rishabhdaim <[email protected]>
AuthorDate: Fri Mar 6 17:13:56 2026 +0530

    OAK-12129 : improve test coverage for ChangeProcessor stopAndWait behaviour
---
 .../oak/jcr/observation/ChangeProcessorTest.java   | 174 ++++++++++++++++-----
 1 file changed, 135 insertions(+), 39 deletions(-)

diff --git 
a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorTest.java
 
b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorTest.java
index eefcdc91c6..3844e992e1 100644
--- 
a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorTest.java
+++ 
b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorTest.java
@@ -9,7 +9,7 @@
  *
  *   http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed in writing,
+ * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  * KIND, either express or implied.  See the License for the
@@ -18,40 +18,50 @@
  */
 package org.apache.jackrabbit.oak.jcr.observation;
 
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
+import java.lang.reflect.Field;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.jackrabbit.commons.observation.ListenerTracker;
 import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider;
 import org.apache.jackrabbit.oak.namepath.NamePathMapper;
-import org.apache.jackrabbit.commons.observation.ListenerTracker;
+import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
 import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
+import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
 import org.apache.jackrabbit.oak.stats.StatisticManager;
-import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
-import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider;
 import org.junit.Assert;
-
-import java.lang.reflect.Field;
-import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
- * Unit cases for {@link ChangeProcessor} class
+ * Unit tests for {@link ChangeProcessor}.
+ *
+ * <p>Contains two groups of tests:
+ * <ol>
+ *   <li>New-implementation-specific tests that verify the {@link 
AtomicBoolean}
+ *       {@code stopped} flag introduced when Guava {@code Monitor} was 
removed.</li>
+ *   <li>Behavioural tests for {@link ChangeProcessor#stop()} and
+ *       {@link ChangeProcessor#stopAndWait(int, TimeUnit)} that are written
+ *       against the public API only and pass against both the old Guava-based
+ *       and the new {@link ReentrantLock}-based implementations.</li>
+ * </ol>
  */
 public class ChangeProcessorTest {
 
+    // ── stopped-flag tests (new implementation specific) ───────────────────
+
     @Test
     public void testStopSetStoppedFlagCorrectly() {
         ChangeProcessor changeProcessor = createChangeProcessor();
         CompositeRegistration registration = 
Mockito.mock(CompositeRegistration.class);
         setRegistration(changeProcessor, registration);
-        // Assert stopped flag is false before calling stop()
         Assert.assertFalse("stopped flag should be false before stop()", 
getStoppedFlag(changeProcessor).get());
-        // Now call stop(), which should throw due to leave() without enter()
         changeProcessor.stop();
         Mockito.verify(registration).unregister();
-        // Assert stopped flag is true
         Assert.assertTrue("stopped flag should be true after stop()", 
getStoppedFlag(changeProcessor).get());
-
-
     }
 
     @Test
@@ -61,14 +71,12 @@ public class ChangeProcessorTest {
         setRegistration(changeProcessor, registration);
 
         String toStringBeforeStop = changeProcessor.toString();
-        System.out.println("Before stop: " + toStringBeforeStop);
         Assert.assertTrue("toString() before stop should indicate 
running=true: " + toStringBeforeStop,
                 toStringBeforeStop.contains("running=true"));
 
         changeProcessor.stop();
 
         String toStringAfterStop = changeProcessor.toString();
-        System.out.println("After stop: " + toStringAfterStop);
         Assert.assertTrue("toString() after stop should indicate 
running=false: " + toStringAfterStop,
                 toStringAfterStop.contains("running=false"));
     }
@@ -78,21 +86,113 @@ public class ChangeProcessorTest {
         ChangeProcessor changeProcessor = createChangeProcessor();
         CompositeRegistration registration = 
Mockito.mock(CompositeRegistration.class);
         setRegistration(changeProcessor, registration);
-        // Assert stopped flag is false before calling stop()
         Assert.assertFalse("stopped flag should be false before stop()", 
getStoppedFlag(changeProcessor).get());
-        // Call stop() first time
         changeProcessor.stop();
         Assert.assertTrue("stopped flag should be true after first stop()", 
getStoppedFlag(changeProcessor).get());
         Mockito.verify(registration).unregister();
-        // Call stop() second time
         changeProcessor.stop();
         Assert.assertTrue("stopped flag should remain true after second 
stop()", getStoppedFlag(changeProcessor).get());
-        // unregister should only be called once
         Mockito.verify(registration, Mockito.times(1)).unregister();
     }
 
+    // ── stopAndWait() behavioural tests ─────────────────────────────────────
+
+    /**
+     * stopAndWait() must return {@code true} and call
+     * {@code registration.unregister()} when no other thread holds the
+     * running lock (i.e. no contentChanged() is in progress).
+     */
+    @Test
+    public void testStopAndWaitReturnsTrueWhenNotContended() throws 
InterruptedException {
+        ChangeProcessor cp = createChangeProcessor();
+        CompositeRegistration registration = 
Mockito.mock(CompositeRegistration.class);
+        setRegistration(cp, registration);
 
-    // helper methods
+        boolean result = cp.stopAndWait(1, TimeUnit.SECONDS);
+
+        Assert.assertTrue("stopAndWait should return true when lock is free", 
result);
+        Mockito.verify(registration).unregister();
+    }
+
+    /**
+     * When a concurrent thread holds the running lock (simulating an
+     * in-progress contentChanged() call), stopAndWait() must block until the
+     * timeout elapses and then return {@code false}.
+     */
+    @Test
+    public void testStopAndWaitReturnsFalseOnTimeout() throws Exception {
+        ChangeProcessor cp = createChangeProcessor();
+        CompositeRegistration registration = 
Mockito.mock(CompositeRegistration.class);
+        setRegistration(cp, registration);
+
+        ReentrantLock underlyingLock = getUnderlyingLock(cp);
+        CountDownLatch lockAcquired = new CountDownLatch(1);
+        CountDownLatch testDone = new CountDownLatch(1);
+
+        // Simulate a long-running contentChanged() by holding the underlying 
lock.
+        Thread holder = new Thread(() -> {
+            underlyingLock.lock();
+            lockAcquired.countDown();
+            try {
+                testDone.await(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            } finally {
+                underlyingLock.unlock();
+            }
+        });
+        holder.start();
+        lockAcquired.await(2, TimeUnit.SECONDS);
+
+        boolean result = cp.stopAndWait(100, TimeUnit.MILLISECONDS);
+
+        Assert.assertFalse(
+                "stopAndWait should return false when the running lock is held 
by another thread",
+                result);
+
+        testDone.countDown();
+        holder.join(2000);
+    }
+
+    /**
+     * When the processor was already stopped (e.g. by a prior stop() call),
+     * stopAndWait() must return {@code true} immediately without calling
+     * unregister() again.
+     */
+    @Test
+    public void testStopAndWaitReturnsTrueWhenAlreadyStopped() throws 
InterruptedException {
+        ChangeProcessor cp = createChangeProcessor();
+        CompositeRegistration registration = 
Mockito.mock(CompositeRegistration.class);
+        setRegistration(cp, registration);
+
+        cp.stop();
+
+        boolean result = cp.stopAndWait(100, TimeUnit.MILLISECONDS);
+
+        Assert.assertTrue("stopAndWait should return true when already 
stopped", result);
+        Mockito.verify(registration, Mockito.times(1)).unregister();
+    }
+
+    /**
+     * Once stopAndWait() returns {@code true} the processor must report
+     * {@code running=false}.
+     */
+    @Test
+    public void testStopAndWaitSetsRunningToFalse() throws 
InterruptedException {
+        ChangeProcessor cp = createChangeProcessor();
+        CompositeRegistration registration = 
Mockito.mock(CompositeRegistration.class);
+        setRegistration(cp, registration);
+
+        Assert.assertTrue("should be running=true before stopAndWait",
+                cp.toString().contains("running=true"));
+
+        cp.stopAndWait(1, TimeUnit.SECONDS);
+
+        Assert.assertTrue("should be running=false after stopAndWait",
+                cp.toString().contains("running=false"));
+    }
+
+    // ── helpers ─────────────────────────────────────────────────────────────
 
     private static ChangeProcessor createChangeProcessor() {
         ContentSession contentSession = Mockito.mock(ContentSession.class);
@@ -100,20 +200,10 @@ public class ChangeProcessorTest {
         ListenerTracker tracker = Mockito.mock(ListenerTracker.class);
         FilterProvider filter = Mockito.mock(FilterProvider.class);
         StatisticManager statisticManager = 
Mockito.mock(StatisticManager.class);
-        int queueLength = 1;
         CommitRateLimiter commitRateLimiter = 
Mockito.mock(CommitRateLimiter.class);
         BlobAccessProvider blobAccessProvider = 
Mockito.mock(BlobAccessProvider.class);
-
-        return new ChangeProcessor(
-            contentSession,
-            namePathMapper,
-            tracker,
-            filter,
-            statisticManager,
-            queueLength,
-            commitRateLimiter,
-            blobAccessProvider
-        );
+        return new ChangeProcessor(contentSession, namePathMapper, tracker, 
filter,
+                statisticManager, 1, commitRateLimiter, blobAccessProvider);
     }
 
     private static AtomicBoolean getStoppedFlag(ChangeProcessor 
changeProcessor) {
@@ -126,11 +216,17 @@ public class ChangeProcessorTest {
         }
     }
 
-    private static void setRegistration(ChangeProcessor changeProcessor, 
CompositeRegistration registration) {
+    private static ReentrantLock getUnderlyingLock(ChangeProcessor cp) throws 
Exception {
+        Field f = ChangeProcessor.class.getDeclaredField("runningLock");
+        f.setAccessible(true);
+        return (ReentrantLock) f.get(cp);
+    }
+
+    private static void setRegistration(ChangeProcessor cp, 
CompositeRegistration registration) {
         try {
-            Field regField = 
ChangeProcessor.class.getDeclaredField("registration");
-            regField.setAccessible(true);
-            regField.set(changeProcessor, registration);
+            Field f = ChangeProcessor.class.getDeclaredField("registration");
+            f.setAccessible(true);
+            f.set(cp, registration);
         } catch (Exception e) {
             throw new RuntimeException(e);
         }

Reply via email to