This is an automated email from the ASF dual-hosted git repository.

daim pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d290898902 OAK-12072 : removed Guava's Monitor and Guard (#2721)
d290898902 is described below

commit d290898902170ded6e341cce66982e48d15bd675
Author: Rishabh Kumar <[email protected]>
AuthorDate: Mon Feb 23 19:20:18 2026 +0530

    OAK-12072 : removed Guava's Monitor and Guard (#2721)
    
    * OAK-12072 : removed Guava's Monitor and Guard
    
    * OAK-12072 : added unit cases for ChangeProcessor class
---
 .../oak/jcr/observation/ChangeProcessor.java       | 150 +++++++++++++--------
 .../oak/jcr/observation/ChangeProcessorTest.java   | 138 +++++++++++++++++++
 2 files changed, 235 insertions(+), 53 deletions(-)

diff --git 
a/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
 
b/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
index 3163afdbaf..4450a38f4e 100644
--- 
a/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
+++ 
b/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
@@ -29,8 +29,10 @@ import static 
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleW
 
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jcr.observation.Event;
 import javax.jcr.observation.EventIterator;
@@ -73,9 +75,6 @@ import org.jetbrains.annotations.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.jackrabbit.guava.common.util.concurrent.Monitor;
-import org.apache.jackrabbit.guava.common.util.concurrent.Monitor.Guard;
-
 /**
  * A {@code ChangeProcessor} generates observation {@link 
javax.jcr.observation.Event}s
  * based on a {@link FilterProvider filter} and delivers them to an {@link 
EventListener}.
@@ -417,8 +416,8 @@ class ChangeProcessor implements FilteringAwareObserver {
         });
     }
 
-    private final Monitor runningMonitor = new Monitor();
-    private final RunningGuard running = new RunningGuard(runningMonitor);
+    private final AtomicBoolean stopped = new AtomicBoolean(false);
+    private final ReentrantLock runningLock = new ReentrantLock(false);
 
     /**
      * Try to stop this change processor if running. This method will wait
@@ -437,18 +436,23 @@ class ChangeProcessor implements FilteringAwareObserver {
      */
     public synchronized boolean stopAndWait(int timeOut, TimeUnit unit) {
         Validate.checkState(registration != null, "Change processor not 
started");
-        if (running.stop()) {
-            if (runningMonitor.enter(timeOut, unit)) {
-                registration.unregister();
-                runningMonitor.leave();
-                return true;
-            } else {
-                // Timed out
-                return false;
-            }
-        } else {
-            // Stopped already
+
+        if (!stopFlagSet()) {
+            // already stopped, return from here
+            return true;
+        }
+
+        if (!enterWithTimeout(timeOut, unit)) {
+            // timed out acquiring the running lock, return early
+            return false;
+        }
+
+        // lock has been acquired
+        try {
+            registration.unregister();
             return true;
+        } finally {
+            runningLock.unlock();
         }
     }
 
@@ -460,9 +464,15 @@ class ChangeProcessor implements FilteringAwareObserver {
      */
     public synchronized void stop() {
         Validate.checkState(registration != null, "Change processor not 
started");
-        if (running.stop()) {
-            registration.unregister();
-            runningMonitor.leave();
+
+        if (stopFlagSet()) {
+            // Wait until no contentChanged is in the critical section
+            runningLock.lock();
+            try {
+                registration.unregister();
+            } finally {
+                runningLock.unlock();
+            }
         }
     }
 
@@ -503,19 +513,24 @@ class ChangeProcessor implements FilteringAwareObserver {
                 long time = System.nanoTime();
                 boolean hasEvents = events.hasNext();
                 tracker.recordProducerTime(System.nanoTime() - time, 
TimeUnit.NANOSECONDS);
-                if (hasEvents && runningMonitor.enterIf(running)) {
-                    if (commitRateLimiter != null) {
-                        commitRateLimiter.beforeNonBlocking();
-                    }
+                if (hasEvents && enterIfRunning()) {
+                    // lock has been acquired
                     try {
-                        CountingIterator countingEvents = new 
CountingIterator(events);
-                        eventListener.onEvent(countingEvents);
-                        countingEvents.updateCounters(eventCount, 
eventDuration);
-                    } finally {
                         if (commitRateLimiter != null) {
-                            commitRateLimiter.afterNonBlocking();
+                            commitRateLimiter.beforeNonBlocking();
                         }
-                        runningMonitor.leave();
+                        try {
+                            CountingIterator countingEvents = new 
CountingIterator(events);
+                            eventListener.onEvent(countingEvents);
+                            countingEvents.updateCounters(eventCount, 
eventDuration);
+                        } finally {
+                            if (commitRateLimiter != null) {
+                                commitRateLimiter.afterNonBlocking();
+                            }
+                        }
+                    } finally {
+                        // unlock now
+                        runningLock.unlock();
                     }
                 }
             }
@@ -602,29 +617,6 @@ class ChangeProcessor implements FilteringAwareObserver {
         }
     }
 
-    private static class RunningGuard extends Guard {
-        private boolean stopped;
-
-        public RunningGuard(Monitor monitor) {
-            super(monitor);
-        }
-
-        @Override
-        public boolean isSatisfied() {
-            return !stopped;
-        }
-
-        /**
-         * @return  {@code true} if this call set this guard to stopped,
-         *          {@code false} if another call set this guard to stopped 
before.
-         */
-        public boolean stop() {
-            boolean wasStopped = stopped;
-            stopped = true;
-            return !wasStopped;
-        }
-    }
-
     @Override
     public String toString() {
         return "ChangeProcessor ["
@@ -634,7 +626,7 @@ class ChangeProcessor implements FilteringAwareObserver {
                 + ", eventCount=" + eventCount 
                 + ", eventDuration=" + eventDuration 
                 + ", commitRateLimiter=" + commitRateLimiter
-                + ", running=" + running.isSatisfied() + "]";
+                + ", running=" + !stopped.get() + "]";
     }
     
     /** for logging only **/
@@ -689,4 +681,56 @@ class ChangeProcessor implements FilteringAwareObserver {
             return FilterResult.INCLUDE;
         }
     }
+
+    // helper methods for lock/unlocking
+
+    private boolean stopFlagSet() {
+        // true only for the first caller that changes false -> true
+        return stopped.compareAndSet(false, true);
+    }
+
+    private boolean enterIfRunning() {
+        runningLock.lock();
+        boolean ok = false;
+        try {
+            ok = !stopped.get();   // guard: same as RunningGuard.isSatisfied()
+            return ok;
+        } finally {
+            if (!ok) {
+                runningLock.unlock();
+            }
+        }
+    }
+
+    private boolean enterWithTimeout(long timeout, TimeUnit unit) {
+
+        // non-fair fast path
+        if (runningLock.tryLock()) {
+            return true;
+        }
+
+        long timeoutNanos = unit.toNanos(timeout);
+
+        boolean interrupted = false;
+        try {
+            long start = System.nanoTime();
+            long remaining = timeoutNanos;
+
+            while (remaining > 0L) {
+                try {
+                    // timed out
+                    return runningLock.tryLock(remaining, 
TimeUnit.NANOSECONDS);
+                } catch (InterruptedException e) {
+                    interrupted = true;
+                    long elapsed = System.nanoTime() - start;
+                    remaining = timeoutNanos - elapsed;
+                }
+            }
+            return false; // timeout
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
 }
diff --git 
a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorTest.java
 
b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorTest.java
new file mode 100644
index 0000000000..eefcdc91c6
--- /dev/null
+++ 
b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.jcr.observation;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
+import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.namepath.NamePathMapper;
+import org.apache.jackrabbit.commons.observation.ListenerTracker;
+import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
+import org.apache.jackrabbit.oak.stats.StatisticManager;
+import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
+import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider;
+import org.junit.Assert;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Unit cases for {@link ChangeProcessor} class
+ */
+public class ChangeProcessorTest {
+
+    @Test
+    public void testStopSetStoppedFlagCorrectly() {
+        ChangeProcessor changeProcessor = createChangeProcessor();
+        CompositeRegistration registration = 
Mockito.mock(CompositeRegistration.class);
+        setRegistration(changeProcessor, registration);
+        // Assert stopped flag is false before calling stop()
+        Assert.assertFalse("stopped flag should be false before stop()", 
getStoppedFlag(changeProcessor).get());
+        // Now call stop(), which should throw due to leave() without enter()
+        changeProcessor.stop();
+        Mockito.verify(registration).unregister();
+        // Assert stopped flag is true
+        Assert.assertTrue("stopped flag should be true after stop()", 
getStoppedFlag(changeProcessor).get());
+
+
+    }
+
+    @Test
+    public void testToStringRunningStateBeforeAndAfterStop() {
+        ChangeProcessor changeProcessor = createChangeProcessor();
+        CompositeRegistration registration = 
Mockito.mock(CompositeRegistration.class);
+        setRegistration(changeProcessor, registration);
+
+        String toStringBeforeStop = changeProcessor.toString();
+        System.out.println("Before stop: " + toStringBeforeStop);
+        Assert.assertTrue("toString() before stop should indicate 
running=true: " + toStringBeforeStop,
+                toStringBeforeStop.contains("running=true"));
+
+        changeProcessor.stop();
+
+        String toStringAfterStop = changeProcessor.toString();
+        System.out.println("After stop: " + toStringAfterStop);
+        Assert.assertTrue("toString() after stop should indicate 
running=false: " + toStringAfterStop,
+                toStringAfterStop.contains("running=false"));
+    }
+
+    @Test
+    public void testStopIdempotency() {
+        ChangeProcessor changeProcessor = createChangeProcessor();
+        CompositeRegistration registration = 
Mockito.mock(CompositeRegistration.class);
+        setRegistration(changeProcessor, registration);
+        // Assert stopped flag is false before calling stop()
+        Assert.assertFalse("stopped flag should be false before stop()", 
getStoppedFlag(changeProcessor).get());
+        // Call stop() first time
+        changeProcessor.stop();
+        Assert.assertTrue("stopped flag should be true after first stop()", 
getStoppedFlag(changeProcessor).get());
+        Mockito.verify(registration).unregister();
+        // Call stop() second time
+        changeProcessor.stop();
+        Assert.assertTrue("stopped flag should remain true after second 
stop()", getStoppedFlag(changeProcessor).get());
+        // unregister should only be called once
+        Mockito.verify(registration, Mockito.times(1)).unregister();
+    }
+
+
+    // helper methods
+
+    private static ChangeProcessor createChangeProcessor() {
+        ContentSession contentSession = Mockito.mock(ContentSession.class);
+        NamePathMapper namePathMapper = Mockito.mock(NamePathMapper.class);
+        ListenerTracker tracker = Mockito.mock(ListenerTracker.class);
+        FilterProvider filter = Mockito.mock(FilterProvider.class);
+        StatisticManager statisticManager = 
Mockito.mock(StatisticManager.class);
+        int queueLength = 1;
+        CommitRateLimiter commitRateLimiter = 
Mockito.mock(CommitRateLimiter.class);
+        BlobAccessProvider blobAccessProvider = 
Mockito.mock(BlobAccessProvider.class);
+
+        return new ChangeProcessor(
+            contentSession,
+            namePathMapper,
+            tracker,
+            filter,
+            statisticManager,
+            queueLength,
+            commitRateLimiter,
+            blobAccessProvider
+        );
+    }
+
+    private static AtomicBoolean getStoppedFlag(ChangeProcessor 
changeProcessor) {
+        try {
+            Field stoppedField = 
ChangeProcessor.class.getDeclaredField("stopped");
+            stoppedField.setAccessible(true);
+            return (AtomicBoolean) stoppedField.get(changeProcessor);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static void setRegistration(ChangeProcessor changeProcessor, 
CompositeRegistration registration) {
+        try {
+            Field regField = 
ChangeProcessor.class.getDeclaredField("registration");
+            regField.setAccessible(true);
+            regField.set(changeProcessor, registration);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+}

Reply via email to