This is an automated email from the ASF dual-hosted git repository.
daim pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
The following commit(s) were added to refs/heads/trunk by this push:
new d290898902 OAK-12072 : removed Guava's Monitor and Guard (#2721)
d290898902 is described below
commit d290898902170ded6e341cce66982e48d15bd675
Author: Rishabh Kumar <[email protected]>
AuthorDate: Mon Feb 23 19:20:18 2026 +0530
OAK-12072 : removed Guava's Monitor and Guard (#2721)
* OAK-12072 : removed Guava's Monitor and Guard
* OAK-12072 : added unit cases for ChangeProcessor class
---
.../oak/jcr/observation/ChangeProcessor.java | 150 +++++++++++++--------
.../oak/jcr/observation/ChangeProcessorTest.java | 138 +++++++++++++++++++
2 files changed, 235 insertions(+), 53 deletions(-)
diff --git
a/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
b/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
index 3163afdbaf..4450a38f4e 100644
---
a/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
+++
b/oak-jcr/src/main/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.java
@@ -29,8 +29,10 @@ import static
org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils.scheduleW
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import javax.jcr.observation.Event;
import javax.jcr.observation.EventIterator;
@@ -73,9 +75,6 @@ import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.jackrabbit.guava.common.util.concurrent.Monitor;
-import org.apache.jackrabbit.guava.common.util.concurrent.Monitor.Guard;
-
/**
* A {@code ChangeProcessor} generates observation {@link
javax.jcr.observation.Event}s
* based on a {@link FilterProvider filter} and delivers them to an {@link
EventListener}.
@@ -417,8 +416,8 @@ class ChangeProcessor implements FilteringAwareObserver {
});
}
- private final Monitor runningMonitor = new Monitor();
- private final RunningGuard running = new RunningGuard(runningMonitor);
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
+ private final ReentrantLock runningLock = new ReentrantLock(false);
/**
* Try to stop this change processor if running. This method will wait
@@ -437,18 +436,23 @@ class ChangeProcessor implements FilteringAwareObserver {
*/
public synchronized boolean stopAndWait(int timeOut, TimeUnit unit) {
Validate.checkState(registration != null, "Change processor not
started");
- if (running.stop()) {
- if (runningMonitor.enter(timeOut, unit)) {
- registration.unregister();
- runningMonitor.leave();
- return true;
- } else {
- // Timed out
- return false;
- }
- } else {
- // Stopped already
+
+ if (!stopFlagSet()) {
+ // already stopped, return from here
+ return true;
+ }
+
+ if (!enterWithTimeout(timeOut, unit)) {
+ // timed out acquiring the running lock, return early
+ return false;
+ }
+
+ // lock has been acquired
+ try {
+ registration.unregister();
return true;
+ } finally {
+ runningLock.unlock();
}
}
@@ -460,9 +464,15 @@ class ChangeProcessor implements FilteringAwareObserver {
*/
public synchronized void stop() {
Validate.checkState(registration != null, "Change processor not
started");
- if (running.stop()) {
- registration.unregister();
- runningMonitor.leave();
+
+ if (stopFlagSet()) {
+ // Wait until no contentChanged is in the critical section
+ runningLock.lock();
+ try {
+ registration.unregister();
+ } finally {
+ runningLock.unlock();
+ }
}
}
@@ -503,19 +513,24 @@ class ChangeProcessor implements FilteringAwareObserver {
long time = System.nanoTime();
boolean hasEvents = events.hasNext();
tracker.recordProducerTime(System.nanoTime() - time,
TimeUnit.NANOSECONDS);
- if (hasEvents && runningMonitor.enterIf(running)) {
- if (commitRateLimiter != null) {
- commitRateLimiter.beforeNonBlocking();
- }
+ if (hasEvents && enterIfRunning()) {
+ // lock has been acquired
try {
- CountingIterator countingEvents = new
CountingIterator(events);
- eventListener.onEvent(countingEvents);
- countingEvents.updateCounters(eventCount,
eventDuration);
- } finally {
if (commitRateLimiter != null) {
- commitRateLimiter.afterNonBlocking();
+ commitRateLimiter.beforeNonBlocking();
}
- runningMonitor.leave();
+ try {
+ CountingIterator countingEvents = new
CountingIterator(events);
+ eventListener.onEvent(countingEvents);
+ countingEvents.updateCounters(eventCount,
eventDuration);
+ } finally {
+ if (commitRateLimiter != null) {
+ commitRateLimiter.afterNonBlocking();
+ }
+ }
+ } finally {
+ // unlock now
+ runningLock.unlock();
}
}
}
@@ -602,29 +617,6 @@ class ChangeProcessor implements FilteringAwareObserver {
}
}
- private static class RunningGuard extends Guard {
- private boolean stopped;
-
- public RunningGuard(Monitor monitor) {
- super(monitor);
- }
-
- @Override
- public boolean isSatisfied() {
- return !stopped;
- }
-
- /**
- * @return {@code true} if this call set this guard to stopped,
- * {@code false} if another call set this guard to stopped
before.
- */
- public boolean stop() {
- boolean wasStopped = stopped;
- stopped = true;
- return !wasStopped;
- }
- }
-
@Override
public String toString() {
return "ChangeProcessor ["
@@ -634,7 +626,7 @@ class ChangeProcessor implements FilteringAwareObserver {
+ ", eventCount=" + eventCount
+ ", eventDuration=" + eventDuration
+ ", commitRateLimiter=" + commitRateLimiter
- + ", running=" + running.isSatisfied() + "]";
+ + ", running=" + !stopped.get() + "]";
}
/** for logging only **/
@@ -689,4 +681,56 @@ class ChangeProcessor implements FilteringAwareObserver {
return FilterResult.INCLUDE;
}
}
+
+ // helper methods for lock/unlocking
+
+ private boolean stopFlagSet() {
+ // true only for the first caller that changes false -> true
+ return stopped.compareAndSet(false, true);
+ }
+
+ private boolean enterIfRunning() {
+ runningLock.lock();
+ boolean ok = false;
+ try {
+ ok = !stopped.get(); // guard: same as RunningGuard.isSatisfied()
+ return ok;
+ } finally {
+ if (!ok) {
+ runningLock.unlock();
+ }
+ }
+ }
+
+ private boolean enterWithTimeout(long timeout, TimeUnit unit) {
+
+ // non-fair fast path
+ if (runningLock.tryLock()) {
+ return true;
+ }
+
+ long timeoutNanos = unit.toNanos(timeout);
+
+ boolean interrupted = false;
+ try {
+ long start = System.nanoTime();
+ long remaining = timeoutNanos;
+
+ while (remaining > 0L) {
+ try {
+ // timed out
+ return runningLock.tryLock(remaining,
TimeUnit.NANOSECONDS);
+ } catch (InterruptedException e) {
+ interrupted = true;
+ long elapsed = System.nanoTime() - start;
+ remaining = timeoutNanos - elapsed;
+ }
+ }
+ return false; // timeout
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
}
diff --git
a/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorTest.java
b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorTest.java
new file mode 100644
index 0000000000..eefcdc91c6
--- /dev/null
+++
b/oak-jcr/src/test/java/org/apache/jackrabbit/oak/jcr/observation/ChangeProcessorTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.jackrabbit.oak.jcr.observation;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
+import org.apache.jackrabbit.oak.api.ContentSession;
+import org.apache.jackrabbit.oak.namepath.NamePathMapper;
+import org.apache.jackrabbit.commons.observation.ListenerTracker;
+import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
+import org.apache.jackrabbit.oak.stats.StatisticManager;
+import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
+import org.apache.jackrabbit.oak.api.blob.BlobAccessProvider;
+import org.junit.Assert;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Unit cases for {@link ChangeProcessor} class
+ */
+public class ChangeProcessorTest {
+
+ @Test
+ public void testStopSetStoppedFlagCorrectly() {
+ ChangeProcessor changeProcessor = createChangeProcessor();
+ CompositeRegistration registration =
Mockito.mock(CompositeRegistration.class);
+ setRegistration(changeProcessor, registration);
+ // Assert stopped flag is false before calling stop()
+ Assert.assertFalse("stopped flag should be false before stop()",
getStoppedFlag(changeProcessor).get());
+ // Now call stop(), which should throw due to leave() without enter()
+ changeProcessor.stop();
+ Mockito.verify(registration).unregister();
+ // Assert stopped flag is true
+ Assert.assertTrue("stopped flag should be true after stop()",
getStoppedFlag(changeProcessor).get());
+
+
+ }
+
+ @Test
+ public void testToStringRunningStateBeforeAndAfterStop() {
+ ChangeProcessor changeProcessor = createChangeProcessor();
+ CompositeRegistration registration =
Mockito.mock(CompositeRegistration.class);
+ setRegistration(changeProcessor, registration);
+
+ String toStringBeforeStop = changeProcessor.toString();
+ System.out.println("Before stop: " + toStringBeforeStop);
+ Assert.assertTrue("toString() before stop should indicate
running=true: " + toStringBeforeStop,
+ toStringBeforeStop.contains("running=true"));
+
+ changeProcessor.stop();
+
+ String toStringAfterStop = changeProcessor.toString();
+ System.out.println("After stop: " + toStringAfterStop);
+ Assert.assertTrue("toString() after stop should indicate
running=false: " + toStringAfterStop,
+ toStringAfterStop.contains("running=false"));
+ }
+
+ @Test
+ public void testStopIdempotency() {
+ ChangeProcessor changeProcessor = createChangeProcessor();
+ CompositeRegistration registration =
Mockito.mock(CompositeRegistration.class);
+ setRegistration(changeProcessor, registration);
+ // Assert stopped flag is false before calling stop()
+ Assert.assertFalse("stopped flag should be false before stop()",
getStoppedFlag(changeProcessor).get());
+ // Call stop() first time
+ changeProcessor.stop();
+ Assert.assertTrue("stopped flag should be true after first stop()",
getStoppedFlag(changeProcessor).get());
+ Mockito.verify(registration).unregister();
+ // Call stop() second time
+ changeProcessor.stop();
+ Assert.assertTrue("stopped flag should remain true after second
stop()", getStoppedFlag(changeProcessor).get());
+ // unregister should only be called once
+ Mockito.verify(registration, Mockito.times(1)).unregister();
+ }
+
+
+ // helper methods
+
+ private static ChangeProcessor createChangeProcessor() {
+ ContentSession contentSession = Mockito.mock(ContentSession.class);
+ NamePathMapper namePathMapper = Mockito.mock(NamePathMapper.class);
+ ListenerTracker tracker = Mockito.mock(ListenerTracker.class);
+ FilterProvider filter = Mockito.mock(FilterProvider.class);
+ StatisticManager statisticManager =
Mockito.mock(StatisticManager.class);
+ int queueLength = 1;
+ CommitRateLimiter commitRateLimiter =
Mockito.mock(CommitRateLimiter.class);
+ BlobAccessProvider blobAccessProvider =
Mockito.mock(BlobAccessProvider.class);
+
+ return new ChangeProcessor(
+ contentSession,
+ namePathMapper,
+ tracker,
+ filter,
+ statisticManager,
+ queueLength,
+ commitRateLimiter,
+ blobAccessProvider
+ );
+ }
+
+ private static AtomicBoolean getStoppedFlag(ChangeProcessor
changeProcessor) {
+ try {
+ Field stoppedField =
ChangeProcessor.class.getDeclaredField("stopped");
+ stoppedField.setAccessible(true);
+ return (AtomicBoolean) stoppedField.get(changeProcessor);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void setRegistration(ChangeProcessor changeProcessor,
CompositeRegistration registration) {
+ try {
+ Field regField =
ChangeProcessor.class.getDeclaredField("registration");
+ regField.setAccessible(true);
+ regField.set(changeProcessor, registration);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}