This is an automated email from the ASF dual-hosted git repository. daim pushed a commit to branch OAK-12115 in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git
commit e85c76b696b4cfed594fad942cead11cdfd69a3b Author: rishabhdaim <[email protected]> AuthorDate: Fri Feb 27 20:58:40 2026 +0530 OAK-12115 : add monitor and monitor.guard test coverage for SegmentBufferWriterPool --- .../SegmentBufferWriterPoolMonitorTest.java | 484 +++++++++++++++++++++ 1 file changed, 484 insertions(+) diff --git a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolMonitorTest.java b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolMonitorTest.java new file mode 100644 index 0000000000..ce5764e7d5 --- /dev/null +++ b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolMonitorTest.java @@ -0,0 +1,484 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.jackrabbit.oak.segment; + +import org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation; +import org.apache.jackrabbit.oak.segment.memory.MemoryStore; +import org.apache.jackrabbit.oak.segment.spi.persistence.GCGeneration; +import org.jetbrains.annotations.NotNull; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Tests for Monitor and Monitor.Guard usage in SegmentBufferWriterPool. + * These tests specifically verify the behavior of: + * - Monitor.enter() and Monitor.leave() in borrowWriter/returnWriter + * - Monitor.enterWhen(Guard) in flush() waiting for borrowed writers + * - Monitor.Guard.isSatisfied() in allReturned() + * - safeEnterWhen() interruption handling + */ +public class SegmentBufferWriterPoolMonitorTest { + + private MemoryStore store; + private SegmentBufferWriterPool pool; + private GCGeneration gcGeneration; + private ExecutorService executor; + + @Before + public void setUp() throws IOException { + store = new MemoryStore(); + gcGeneration = GCGeneration.NULL; + pool = SegmentBufferWriterPool.factory( + store.getSegmentIdProvider(), + "test", + () -> gcGeneration + ).newPool(SegmentBufferWriterPool.PoolType.GLOBAL); + executor = Executors.newFixedThreadPool(10); + } + + @After + public void tearDown() { + executor.shutdownNow(); + } + + /** + * Tests concurrent borrowWriter and returnWriter operations. + * This verifies Monitor.enter() and Monitor.leave() work correctly + * with multiple threads accessing the pool simultaneously. + */ + @Test + public void testConcurrentBorrowAndReturn() throws Exception { + int numThreads = 10; + int iterationsPerThread = 100; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numThreads); + AtomicInteger successCount = new AtomicInteger(0); + + for (int i = 0; i < numThreads; i++) { + executor.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < iterationsPerThread; j++) { + RecordId result = pool.execute(gcGeneration, new WriteOperation() { + @NotNull + @Override + public RecordId execute(@NotNull SegmentBufferWriter writer) { + // Simulate some work + return store.getRevisions().getHead(); + } + }); + Assert.assertNotNull(result); + successCount.incrementAndGet(); + } + } catch (Exception e) { + Assert.fail("Concurrent operation failed: " + e.getMessage()); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + Assert.assertTrue("Threads should complete within timeout", + doneLatch.await(30, TimeUnit.SECONDS)); + Assert.assertEquals("All operations should succeed", + numThreads * iterationsPerThread, successCount.get()); + } + + /** + * Tests that flush() waits for borrowed writers to be returned. + * This specifically tests Monitor.enterWhen(Guard) and Monitor.Guard.isSatisfied(). + */ + @Test + public void testFlushWaitsForBorrowedWriters() throws Exception { + CyclicBarrier barrier = new CyclicBarrier(2); + AtomicBoolean writeOperationStarted = new AtomicBoolean(false); + AtomicBoolean writeOperationCompleted = new AtomicBoolean(false); + AtomicBoolean flushCompleted = new AtomicBoolean(false); + + // Start a write operation that will hold a writer + Future<RecordId> writeFuture = executor.submit(() -> { + return pool.execute(gcGeneration, new WriteOperation() { + @NotNull + @Override + public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException { + writeOperationStarted.set(true); + try { + // Wait for flush to be called + barrier.await(5, TimeUnit.SECONDS); + // Hold the writer for a bit to ensure flush is waiting + Thread.sleep(100); + writeOperationCompleted.set(true); + } catch (Exception e) { + throw new IOException(e); + } + return store.getRevisions().getHead(); + } + }); + }); + + // Wait for write operation to start + while (!writeOperationStarted.get()) { + Thread.sleep(10); + } + + // Start flush operation + Future<Void> flushFuture = executor.submit(() -> { + try { + // Signal that flush is about to be called + barrier.await(5, TimeUnit.SECONDS); + pool.flush(store); + flushCompleted.set(true); + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + }); + + // Wait for both operations to complete + RecordId writeResult = writeFuture.get(10, TimeUnit.SECONDS); + flushFuture.get(10, TimeUnit.SECONDS); + + Assert.assertNotNull("Write operation should complete", writeResult); + Assert.assertTrue("Write operation should complete before flush", + writeOperationCompleted.get()); + Assert.assertTrue("Flush should complete after write operation returns writer", + flushCompleted.get()); + } + + /** + * Tests that multiple writers are properly waited for during flush. + * This tests the Monitor.Guard condition checking all borrowed writers. + */ + @Test + public void testFlushWaitsForMultipleBorrowedWriters() throws Exception { + int numWriters = 5; + CountDownLatch writersStarted = new CountDownLatch(numWriters); + CountDownLatch releaseWriters = new CountDownLatch(1); + AtomicInteger completedWrites = new AtomicInteger(0); + + List<Future<RecordId>> writeFutures = new ArrayList<>(); + + // Start multiple write operations + for (int i = 0; i < numWriters; i++) { + Future<RecordId> future = executor.submit(() -> { + return pool.execute(gcGeneration, new WriteOperation() { + @NotNull + @Override + public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException { + writersStarted.countDown(); + try { + // Wait until all writers are borrowed + releaseWriters.await(5, TimeUnit.SECONDS); + completedWrites.incrementAndGet(); + } catch (InterruptedException e) { + throw new IOException(e); + } + return store.getRevisions().getHead(); + } + }); + }); + writeFutures.add(future); + } + + // Wait for all writers to be borrowed + Assert.assertTrue("All writers should be borrowed", + writersStarted.await(5, TimeUnit.SECONDS)); + + // Start flush operation + Future<Void> flushFuture = executor.submit(() -> { + try { + // Small delay to ensure flush starts after all writers borrowed + Thread.sleep(50); + pool.flush(store); + } catch (Exception e) { + throw new RuntimeException(e); + } + return null; + }); + + // Give flush a chance to start waiting + Thread.sleep(100); + + // Release all writers + releaseWriters.countDown(); + + // Wait for all writes to complete + for (Future<RecordId> future : writeFutures) { + Assert.assertNotNull("Write should complete", future.get(5, TimeUnit.SECONDS)); + } + + // Wait for flush to complete + flushFuture.get(5, TimeUnit.SECONDS); + + Assert.assertEquals("All writes should complete", numWriters, completedWrites.get()); + } + + /** + * Tests interrupted flush operation. + * This verifies safeEnterWhen() properly handles InterruptedException. + */ + @Test + public void testFlushWithInterruption() throws Exception { + CountDownLatch writerBorrowed = new CountDownLatch(1); + CountDownLatch flushStarted = new CountDownLatch(1); + + // Borrow a writer and hold it + Future<RecordId> writeFuture = executor.submit(() -> { + return pool.execute(gcGeneration, new WriteOperation() { + @NotNull + @Override + public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException { + writerBorrowed.countDown(); + try { + // Hold writer for 5 seconds + Thread.sleep(5000); + } catch (InterruptedException e) { + // Expected if test is torn down + } + return store.getRevisions().getHead(); + } + }); + }); + + // Wait for writer to be borrowed + Assert.assertTrue("Writer should be borrowed", + writerBorrowed.await(2, TimeUnit.SECONDS)); + + // Start flush in separate thread and interrupt it + Thread flushThread = new Thread(() -> { + try { + flushStarted.countDown(); + pool.flush(store); + } catch (IOException e) { + // Ignore + } + }); + + flushThread.start(); + + // Wait for flush to start + Assert.assertTrue("Flush should start", flushStarted.await(2, TimeUnit.SECONDS)); + + // Give flush time to enter waiting state + Thread.sleep(100); + + // Interrupt the flush thread + flushThread.interrupt(); + + // Verify thread was interrupted (should exit quickly) + flushThread.join(1000); + Assert.assertFalse("Flush thread should terminate after interrupt", + flushThread.isAlive()); + + // The interrupted flag should be set + // Note: We can't easily verify this without access to thread internals + } + + /** + * Tests writer disposal during concurrent flush. + * This tests the scenario where a writer is returned after flush() starts. + */ + @Test + public void testWriterDisposalDuringFlush() throws Exception { + CountDownLatch writerBorrowed = new CountDownLatch(1); + CountDownLatch flushCanStart = new CountDownLatch(1); + CountDownLatch writerReturned = new CountDownLatch(1); + + // Borrow a writer + Future<RecordId> writeFuture = executor.submit(() -> { + return pool.execute(gcGeneration, new WriteOperation() { + @NotNull + @Override + public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException { + writerBorrowed.countDown(); + try { + // Wait for flush to start + flushCanStart.await(5, TimeUnit.SECONDS); + // Small delay to ensure flush has collected borrowed writers + Thread.sleep(100); + } catch (InterruptedException e) { + throw new IOException(e); + } finally { + writerReturned.countDown(); + } + return store.getRevisions().getHead(); + } + }); + }); + + // Wait for writer to be borrowed + Assert.assertTrue("Writer should be borrowed", + writerBorrowed.await(2, TimeUnit.SECONDS)); + + // Start flush + Future<Void> flushFuture = executor.submit(() -> { + try { + flushCanStart.countDown(); + pool.flush(store); + } catch (IOException e) { + throw new RuntimeException(e); + } + return null; + }); + + // Wait for operations to complete + RecordId writeResult = writeFuture.get(10, TimeUnit.SECONDS); + flushFuture.get(10, TimeUnit.SECONDS); + + Assert.assertNotNull("Write should complete", writeResult); + Assert.assertEquals("Writer should be returned", 0, writerReturned.getCount()); + } + + /** + * Tests multiple concurrent flushes. + * This verifies Monitor enter/leave work correctly with contention. + */ + @Test + public void testMultipleConcurrentFlushes() throws Exception { + int numFlushes = 5; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numFlushes); + AtomicInteger successfulFlushes = new AtomicInteger(0); + + // Do some writes first + for (int i = 0; i < 10; i++) { + pool.execute(gcGeneration, new WriteOperation() { + @NotNull + @Override + public RecordId execute(@NotNull SegmentBufferWriter writer) { + return store.getRevisions().getHead(); + } + }); + } + + // Start multiple concurrent flushes + for (int i = 0; i < numFlushes; i++) { + executor.submit(() -> { + try { + startLatch.await(); + pool.flush(store); + successfulFlushes.incrementAndGet(); + } catch (Exception e) { + Assert.fail("Flush failed: " + e.getMessage()); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + Assert.assertTrue("All flushes should complete", + doneLatch.await(10, TimeUnit.SECONDS)); + Assert.assertEquals("All flushes should succeed", + numFlushes, successfulFlushes.get()); + } + + /** + * Tests that borrowWriter and returnWriter maintain proper writer state. + * This verifies Monitor protection of the borrowed/disposed sets. + */ + @Test + public void testBorrowReturnStateConsistency() throws Exception { + int iterations = 100; + + for (int i = 0; i < iterations; i++) { + RecordId result = pool.execute(gcGeneration, new WriteOperation() { + @NotNull + @Override + public RecordId execute(@NotNull SegmentBufferWriter writer) throws IOException { + // Writer should be borrowed here + Assert.assertNotNull("Writer should not be null", writer); + return store.getRevisions().getHead(); + } + }); + Assert.assertNotNull("Result should not be null", result); + } + + // Flush should work without issues + pool.flush(store); + + // Should be able to borrow again after flush + RecordId result = pool.execute(gcGeneration, new WriteOperation() { + @NotNull + @Override + public RecordId execute(@NotNull SegmentBufferWriter writer) { + Assert.assertNotNull("Writer should not be null after flush", writer); + return store.getRevisions().getHead(); + } + }); + Assert.assertNotNull("Result after flush should not be null", result); + } + + /** + * Tests rapid borrow/return cycles under load. + * This stresses the Monitor enter/leave operations. + */ + @Test + public void testRapidBorrowReturnCycles() throws Exception { + int numThreads = 20; + int cyclesPerThread = 50; + CountDownLatch startLatch = new CountDownLatch(1); + CountDownLatch doneLatch = new CountDownLatch(numThreads); + AtomicInteger totalOperations = new AtomicInteger(0); + + for (int i = 0; i < numThreads; i++) { + executor.submit(() -> { + try { + startLatch.await(); + for (int j = 0; j < cyclesPerThread; j++) { + pool.execute(gcGeneration, new WriteOperation() { + @NotNull + @Override + public RecordId execute(@NotNull SegmentBufferWriter writer) { + totalOperations.incrementAndGet(); + return store.getRevisions().getHead(); + } + }); + } + } catch (Exception e) { + Assert.fail("Operation failed: " + e.getMessage()); + } finally { + doneLatch.countDown(); + } + }); + } + + startLatch.countDown(); + Assert.assertTrue("All operations should complete", + doneLatch.await(30, TimeUnit.SECONDS)); + Assert.assertEquals("All operations should execute", + numThreads * cyclesPerThread, totalOperations.get()); + } +}
