This is an automated email from the ASF dual-hosted git repository.

adulceanu pushed a commit to branch issues/OAK-9922
in repository https://gitbox.apache.org/repos/asf/jackrabbit-oak.git

commit 9402ecb9fe028ad615f7a8a965d17f5b267d4888
Author: Lucas Weitzendorf <lweitzend...@adobe.com>
AuthorDate: Tue Oct 11 16:32:42 2022 +0200

    OAK-9922 Parallel Compaction
    Make SegmentBufferWriterPool implementation a configurable option
---
 .../oak/segment/DefaultSegmentWriterBuilder.java   |  60 ++--
 .../oak/segment/SegmentBufferWriterPool.java       | 314 +++++++++++++++++----
 .../jackrabbit/oak/segment/file/FileStore.java     |  12 +-
 .../jackrabbit/oak/segment/memory/MemoryStore.java |  16 +-
 .../jackrabbit/oak/segment/NodeRecordTest.java     |   2 +-
 .../segment/ParallelCompactorExternalBlobTest.java |   2 +-
 .../oak/segment/ParallelCompactorTest.java         |   2 +-
 .../oak/segment/SegmentBufferWriterPoolTest.java   |  25 +-
 8 files changed, 310 insertions(+), 123 deletions(-)

diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
index 94ae636a4b..4d70c23c2c 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/DefaultSegmentWriterBuilder.java
@@ -20,6 +20,7 @@
 package org.apache.jackrabbit.oak.segment;
 
 import static 
org.apache.jackrabbit.guava.common.base.Preconditions.checkNotNull;
+import static 
org.apache.jackrabbit.oak.segment.SegmentBufferWriterPool.PoolType;
 
 import org.apache.jackrabbit.guava.common.base.Supplier;
 import org.apache.jackrabbit.guava.common.base.Suppliers;
@@ -29,10 +30,11 @@ import 
org.apache.jackrabbit.oak.segment.file.ReadOnlyFileStore;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
 import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
 import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * Builder for building {@link DefaultSegmentWriter} instances.
- * The returned instances are thread safe if {@link #withWriterPool()}
+ * The returned instances are thread safe if {@link #withWriterPool(PoolType)}
  * was specified and <em>not</em> thread sage if {@link #withoutWriterPool()}
  * was specified (default).
  * <p>
@@ -56,7 +58,7 @@ public final class DefaultSegmentWriterBuilder {
     @NotNull
     private Supplier<GCGeneration> generation = 
Suppliers.ofInstance(GCGeneration.NULL);
 
-    private boolean pooled = false;
+    private PoolType poolType = null;
 
     @NotNull
     private WriterCacheManager cacheManager = new WriterCacheManager.Default();
@@ -81,7 +83,7 @@ public final class DefaultSegmentWriterBuilder {
      * If {@link #withoutWriterPool()} was specified all segments will be 
written
      * at the generation that {@code generation.get()} returned at the time
      * any of the {@code build()} methods is called.
-     * If {@link #withWriterPool()} was specified a segments will be written
+     * If {@link #withWriterPool(PoolType)} ()} was specified, segments will 
be written
      * at the generation that {@code generation.get()} returns when a new 
segment
      * is created by the returned writer.
      */
@@ -106,8 +108,8 @@ public final class DefaultSegmentWriterBuilder {
      * The returned instance is thread safe.
      */
     @NotNull
-    public DefaultSegmentWriterBuilder withWriterPool() {
-        this.pooled = true;
+    public DefaultSegmentWriterBuilder withWriterPool(PoolType writerType) {
+        this.poolType = writerType;
         return this;
     }
 
@@ -117,7 +119,7 @@ public final class DefaultSegmentWriterBuilder {
      */
     @NotNull
     public DefaultSegmentWriterBuilder withoutWriterPool() {
-        this.pooled = false;
+        this.poolType = null;
         return this;
     }
 
@@ -151,7 +153,7 @@ public final class DefaultSegmentWriterBuilder {
                 store.getSegmentIdProvider(),
                 store.getBlobStore(),
                 cacheManager,
-                createWriter(store, pooled),
+                createWriter(store, poolType),
                 store.getBinariesInlineThreshold()
         );
     }
@@ -204,47 +206,27 @@ public final class DefaultSegmentWriterBuilder {
                 store.getSegmentIdProvider(),
                 store.getBlobStore(),
                 cacheManager,
-                createWriter(store, pooled),
+                createWriter(store, poolType),
                 Segment.MEDIUM_LIMIT
         );
     }
 
     @NotNull
-    private WriteOperationHandler createWriter(@NotNull FileStore store, 
boolean pooled) {
-        if (pooled) {
-            return new SegmentBufferWriterPool(
-                    store.getSegmentIdProvider(),
-                    store.getReader(),
-                    name,
-                    generation
-            );
-        } else {
-            return new SegmentBufferWriter(
-                    store.getSegmentIdProvider(),
-                    store.getReader(),
-                    name,
-                    generation.get()
-            );
-        }
+    private WriteOperationHandler createWriter(@NotNull FileStore store, 
@Nullable PoolType poolType) {
+        return createWriter(store.getSegmentIdProvider(), store.getReader(), 
poolType);
+    }
+
+    @NotNull
+    private WriteOperationHandler createWriter(@NotNull MemoryStore store, 
@Nullable PoolType poolType) {
+        return createWriter(store.getSegmentIdProvider(), store.getReader(), 
poolType);
     }
 
     @NotNull
-    private WriteOperationHandler createWriter(@NotNull MemoryStore store, 
boolean pooled) {
-        if (pooled) {
-            return new SegmentBufferWriterPool(
-                    store.getSegmentIdProvider(),
-                    store.getReader(),
-                    name,
-                    generation
-            );
+    private WriteOperationHandler createWriter(@NotNull SegmentIdProvider 
idProvider, @NotNull SegmentReader reader, @Nullable PoolType poolType) {
+        if (poolType == null) {
+            return new SegmentBufferWriter(idProvider, reader, name, 
generation.get());
         } else {
-            return new SegmentBufferWriter(
-                    store.getSegmentIdProvider(),
-                    store.getReader(),
-                    name,
-                    generation.get()
-            );
+            return SegmentBufferWriterPool.factory(idProvider, reader, name, 
generation).newPool(poolType);
         }
     }
-
 }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
index b37ec2ad01..1ee0361744 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java
@@ -29,6 +29,9 @@ import static java.util.Objects.requireNonNull;
 
 import java.io.IOException;
 import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -45,20 +48,7 @@ import org.jetbrains.annotations.NotNull;
  * <p>
  * Instances of this class are thread safe.
  */
-public class SegmentBufferWriterPool implements WriteOperationHandler {
-    /**
-     * Read write lock protecting the state of this pool. Multiple threads can 
access their writers in parallel,
-     * acquiring the read lock. The writer lock is needed for the flush 
operation since it requires none
-     * of the writers to be in use.
-     */
-    private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
-
-    /**
-     * Pool of writers. Every thread is assigned a unique writer per GC 
generation, therefore only requiring
-     * a concurrent map to synchronize access to them.
-     */
-    private final ConcurrentMap<Object, SegmentBufferWriter> writers = 
newConcurrentMap();
-
+public abstract class SegmentBufferWriterPool implements WriteOperationHandler 
{
     @NotNull
     private final SegmentIdProvider idProvider;
 
@@ -73,7 +63,7 @@ public class SegmentBufferWriterPool implements 
WriteOperationHandler {
 
     private short writerId = -1;
 
-    public SegmentBufferWriterPool(
+    private SegmentBufferWriterPool(
             @NotNull SegmentIdProvider idProvider,
             @NotNull SegmentReader reader,
             @NotNull String wid,
@@ -84,55 +74,277 @@ public class SegmentBufferWriterPool implements 
WriteOperationHandler {
         this.gcGeneration = requireNonNull(gcGeneration);
     }
 
-    @NotNull
-    @Override
-    public GCGeneration getGCGeneration() {
-        return gcGeneration.get();
+    public enum PoolType {
+        GLOBAL,
+        THREAD_SPECIFIC;
     }
 
-    @Override
-    public void flush(@NotNull SegmentStore store) throws IOException {
-        lock.writeLock().lock();
-        try {
-            for (SegmentBufferWriter writer : writers.values()) {
+    public static class SegmentBufferWriterPoolFactory {
+        @NotNull
+        private final SegmentIdProvider idProvider;
+        @NotNull
+        private final SegmentReader reader;
+        @NotNull
+        private final String wid;
+        @NotNull
+        private final Supplier<GCGeneration> gcGeneration;
+
+        private SegmentBufferWriterPoolFactory(
+                @NotNull SegmentIdProvider idProvider,
+                @NotNull SegmentReader reader,
+                @NotNull String wid,
+                @NotNull Supplier<GCGeneration> gcGeneration) {
+            this.idProvider = requireNonNull(idProvider);
+            this.reader = requireNonNull(reader);
+            this.wid = requireNonNull(wid);
+            this.gcGeneration = requireNonNull(gcGeneration);
+        }
+
+        @NotNull
+        public SegmentBufferWriterPool newPool(@NotNull 
SegmentBufferWriterPool.PoolType poolType) {
+            switch (poolType) {
+                case GLOBAL:
+                    return new GlobalSegmentBufferWriterPool(idProvider, 
reader, wid, gcGeneration);
+                case THREAD_SPECIFIC:
+                    return new 
ThreadSpecificSegmentBufferWriterPool(idProvider, reader, wid, gcGeneration);
+                default:
+                    throw new IllegalArgumentException("Unknown writer pool 
type.");
+            }
+        }
+    }
+
+    public static SegmentBufferWriterPoolFactory factory(
+            @NotNull SegmentIdProvider idProvider,
+            @NotNull SegmentReader reader,
+            @NotNull String wid,
+            @NotNull Supplier<GCGeneration> gcGeneration) {
+        return new SegmentBufferWriterPoolFactory(idProvider, reader, wid, 
gcGeneration);
+    }
+
+    private static class ThreadSpecificSegmentBufferWriterPool extends 
SegmentBufferWriterPool {
+        /**
+         * Read write lock protecting the state of this pool. Multiple threads 
can access their writers in parallel,
+         * acquiring the read lock. The writer lock is needed for the flush 
operation since it requires none
+         * of the writers to be in use.
+         */
+        private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+        /**
+         * Pool of writers. Every thread is assigned a unique writer per GC 
generation, therefore only requiring
+         * a concurrent map to synchronize access to them.
+         */
+        private final ConcurrentMap<Object, SegmentBufferWriter> writers = 
newConcurrentMap();
+
+        public ThreadSpecificSegmentBufferWriterPool(
+                @NotNull SegmentIdProvider idProvider,
+                @NotNull SegmentReader reader,
+                @NotNull String wid,
+                @NotNull Supplier<GCGeneration> gcGeneration) {
+            super(idProvider, reader, wid, gcGeneration);
+        }
+
+        @NotNull
+        @Override
+        public RecordId execute(@NotNull GCGeneration gcGeneration,
+                                @NotNull WriteOperation writeOperation)
+                throws IOException {
+            lock.readLock().lock();
+            SegmentBufferWriter writer = getWriter(currentThread(), 
gcGeneration);
+            try {
+                return writeOperation.execute(writer);
+            } finally {
+                lock.readLock().unlock();
+            }
+        }
+
+        @Override
+        public void flush(@NotNull SegmentStore store) throws IOException {
+            lock.writeLock().lock();
+            try {
+                for (SegmentBufferWriter writer : writers.values()) {
+                    writer.flush(store);
+                }
+                writers.clear();
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+
+        private SegmentBufferWriter getWriter(@NotNull Thread thread, @NotNull 
GCGeneration gcGeneration) {
+            SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(thread, 
gcGeneration);
+            SegmentBufferWriter writer = writers.get(key);
+            if (writer == null) {
+                writer = newWriter(gcGeneration);
+                writers.put(key, writer);
+            }
+            return writer;
+        }
+    }
+
+    private static class GlobalSegmentBufferWriterPool extends 
SegmentBufferWriterPool {
+        /**
+         * Monitor protecting the state of this pool. Neither of {@link 
#writers},
+         * {@link #borrowed} and {@link #disposed} must be modified without 
owning
+         * this monitor.
+         */
+        private final Monitor poolMonitor = new Monitor(true);
+
+        /**
+         * Pool of current writers that are not in use
+         */
+        private final Map<Object, SegmentBufferWriter> writers = newHashMap();
+
+        /**
+         * Writers that are currently in use
+         */
+        private final Set<SegmentBufferWriter> borrowed = newHashSet();
+
+        /**
+         * Retired writers that have not yet been flushed
+         */
+        private final Set<SegmentBufferWriter> disposed = newHashSet();
+
+        public GlobalSegmentBufferWriterPool(
+                @NotNull SegmentIdProvider idProvider,
+                @NotNull SegmentReader reader,
+                @NotNull String wid,
+                @NotNull Supplier<GCGeneration> gcGeneration) {
+            super(idProvider, reader, wid, gcGeneration);
+        }
+
+        @NotNull
+        @Override
+        public RecordId execute(@NotNull GCGeneration gcGeneration, @NotNull 
WriteOperation writeOperation)
+                throws IOException {
+            SimpleImmutableEntry<?,?> key = new 
SimpleImmutableEntry<>(currentThread(), gcGeneration);
+            SegmentBufferWriter writer = borrowWriter(key, gcGeneration);
+            try {
+                return writeOperation.execute(writer);
+            } finally {
+                returnWriter(key, writer);
+            }
+        }
+
+        @Override
+        public void flush(@NotNull SegmentStore store) throws IOException {
+            List<SegmentBufferWriter> toFlush = newArrayList();
+            List<SegmentBufferWriter> toReturn = newArrayList();
+
+            poolMonitor.enter();
+            try {
+                // Collect all writers that are not currently in use and clear
+                // the list so they won't get re-used anymore.
+                toFlush.addAll(writers.values());
+                writers.clear();
+
+                // Collect all borrowed writers, which we need to wait for.
+                // Clear the list so they will get disposed once returned.
+                toReturn.addAll(borrowed);
+                borrowed.clear();
+            } finally {
+                poolMonitor.leave();
+            }
+
+            // Wait for the return of the borrowed writers. This is the
+            // case once all of them appear in the disposed set.
+            if (safeEnterWhen(poolMonitor, allReturned(toReturn))) {
+                try {
+                    // Collect all disposed writers and clear the list to mark 
them
+                    // as flushed.
+                    toFlush.addAll(toReturn);
+                    disposed.removeAll(toReturn);
+                } finally {
+                    poolMonitor.leave();
+                }
+            }
+
+            // Call flush from outside the pool monitor to avoid potential
+            // deadlocks of that method calling SegmentStore.writeSegment
+            for (SegmentBufferWriter writer : toFlush) {
                 writer.flush(store);
             }
-            writers.clear();
-        } finally {
-            lock.writeLock().unlock();
+        }
+
+        /**
+         * Create a {@code Guard} that is satisfied if and only if {@link 
#disposed}
+         * contains all items in {@code toReturn}
+         */
+        @NotNull
+        private Monitor.Guard allReturned(final List<SegmentBufferWriter> 
toReturn) {
+            return new Monitor.Guard(poolMonitor) {
+
+                @Override
+                public boolean isSatisfied() {
+                    return disposed.containsAll(toReturn);
+                }
+
+            };
+        }
+
+        /**
+         * Same as {@code monitor.enterWhen(guard)} but copes with that pesky 
{@code
+         * InterruptedException} by catching it and setting this thread's
+         * interrupted flag.
+         */
+        private static boolean safeEnterWhen(Monitor monitor, Monitor.Guard 
guard) {
+            try {
+                monitor.enterWhen(guard);
+                return true;
+            } catch (InterruptedException ignore) {
+                currentThread().interrupt();
+                return false;
+            }
+        }
+
+        /**
+         * Return a writer from the pool by its {@code key}. This method may 
return
+         * a fresh writer at any time. Callers need to return a writer before
+         * borrowing it again. Failing to do so leads to undefined behaviour.
+         */
+        private SegmentBufferWriter borrowWriter(@NotNull Object key, @NotNull 
GCGeneration gcGeneration) {
+            poolMonitor.enter();
+            try {
+                SegmentBufferWriter writer = writers.remove(key);
+                if (writer == null) {
+                    writer = newWriter(gcGeneration);
+                }
+                borrowed.add(writer);
+                return writer;
+            } finally {
+                poolMonitor.leave();
+            }
+        }
+
+        /**
+         * Return a writer to the pool using the {@code key} that was used to 
borrow
+         * it.
+         */
+        private void returnWriter(Object key, SegmentBufferWriter writer) {
+            poolMonitor.enter();
+            try {
+                if (borrowed.remove(writer)) {
+                    checkState(writers.put(key, writer) == null);
+                } else {
+                    // Defer flush this writer as it was borrowed while 
flush() was called.
+                    disposed.add(writer);
+                }
+            } finally {
+                poolMonitor.leave();
+            }
         }
     }
 
     @NotNull
     @Override
-    public RecordId execute(@NotNull GCGeneration gcGeneration,
-                            @NotNull WriteOperation writeOperation)
-    throws IOException {
-        lock.readLock().lock();
-        SegmentBufferWriter writer = getWriter(currentThread(), gcGeneration);
-        try {
-            return writeOperation.execute(writer);
-        } finally {
-            lock.readLock().unlock();
-        }
+    public GCGeneration getGCGeneration() {
+        return gcGeneration.get();
     }
 
-    private SegmentBufferWriter getWriter(@NotNull Thread thread, @NotNull 
GCGeneration gcGeneration) {
-        SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(thread, 
gcGeneration);
-        SegmentBufferWriter writer = writers.get(key);
-        if (writer == null) {
-             writer = new SegmentBufferWriter(
-                    idProvider,
-                    reader,
-                    getWriterId(wid),
-                    gcGeneration
-            );
-            writers.put(key, writer);
-        }
-        return writer;
+    protected SegmentBufferWriter newWriter(@NotNull GCGeneration 
gcGeneration) {
+        return new SegmentBufferWriter(idProvider, reader, getWriterId(), 
gcGeneration);
     }
 
-    private String getWriterId(String wid) {
+    protected String getWriterId() {
         if (++writerId > 9999) {
             writerId = 0;
         }
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
index 099b3e53e0..d7c1a9f430 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/file/FileStore.java
@@ -39,13 +39,7 @@ import org.apache.jackrabbit.guava.common.base.Supplier;
 import org.apache.jackrabbit.guava.common.io.Closer;
 import 
org.apache.jackrabbit.guava.common.util.concurrent.UncheckedExecutionException;
 import org.apache.jackrabbit.oak.commons.Buffer;
-import org.apache.jackrabbit.oak.segment.RecordId;
-import org.apache.jackrabbit.oak.segment.Segment;
-import org.apache.jackrabbit.oak.segment.SegmentId;
-import org.apache.jackrabbit.oak.segment.SegmentNodeState;
-import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
-import org.apache.jackrabbit.oak.segment.SegmentNotFoundExceptionListener;
-import org.apache.jackrabbit.oak.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.segment.*;
 import org.apache.jackrabbit.oak.segment.compaction.SegmentGCOptions;
 import org.apache.jackrabbit.oak.segment.file.ShutDown.ShutDownCloser;
 import org.apache.jackrabbit.oak.segment.file.cancel.Canceller;
@@ -145,7 +139,7 @@ public class FileStore extends AbstractFileStore {
 
         this.segmentWriter = defaultSegmentWriterBuilder("sys")
                 .withGeneration(() -> getGcGeneration().nonGC())
-                .withWriterPool()
+                .withWriterPool(SegmentBufferWriterPool.PoolType.GLOBAL)
                 .with(builder.getCacheManager()
                         .withAccessTracking("WRITE", statsProvider))
                 .build(this);
@@ -197,7 +191,7 @@ public class FileStore extends AbstractFileStore {
                 defaultSegmentWriterBuilder("c")
                     
.with(builder.getCacheManager().withAccessTracking("COMPACT", statsProvider))
                     .withGeneration(generation)
-                    .withWriterPool()
+                    
.withWriterPool(SegmentBufferWriterPool.PoolType.THREAD_SPECIFIC)
                     .build(this)
         );
 
diff --git 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
index 50b0ee99f7..916f4cab08 100644
--- 
a/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
+++ 
b/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/memory/MemoryStore.java
@@ -27,17 +27,7 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.jackrabbit.guava.common.collect.Maps;
 
 import org.apache.jackrabbit.oak.commons.Buffer;
-import org.apache.jackrabbit.oak.segment.CachingSegmentReader;
-import org.apache.jackrabbit.oak.segment.Revisions;
-import org.apache.jackrabbit.oak.segment.Segment;
-import org.apache.jackrabbit.oak.segment.SegmentId;
-import org.apache.jackrabbit.oak.segment.SegmentIdFactory;
-import org.apache.jackrabbit.oak.segment.SegmentIdProvider;
-import org.apache.jackrabbit.oak.segment.SegmentNotFoundException;
-import org.apache.jackrabbit.oak.segment.SegmentReader;
-import org.apache.jackrabbit.oak.segment.SegmentStore;
-import org.apache.jackrabbit.oak.segment.SegmentTracker;
-import org.apache.jackrabbit.oak.segment.SegmentWriter;
+import org.apache.jackrabbit.oak.segment.*;
 import org.apache.jackrabbit.oak.spi.blob.BlobStore;
 import org.apache.jackrabbit.oak.stats.NoopStats;
 import org.jetbrains.annotations.NotNull;
@@ -72,7 +62,9 @@ public class MemoryStore implements SegmentStore {
         });
         this.revisions = new MemoryStoreRevisions();
         this.segmentReader = new CachingSegmentReader(this::getWriter, null, 
16, 2, NoopStats.INSTANCE);
-        this.segmentWriter = 
defaultSegmentWriterBuilder("sys").withWriterPool().build(this);
+        this.segmentWriter = defaultSegmentWriterBuilder("sys")
+                .withWriterPool(SegmentBufferWriterPool.PoolType.GLOBAL)
+                .build(this);
         revisions.bind(this);
         segmentWriter.flush();
     }
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java
index ae6856a857..5ea1f24d79 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/NodeRecordTest.java
@@ -105,7 +105,7 @@ public class NodeRecordTest {
 
             SegmentWriter writer = defaultSegmentWriterBuilder("test")
                     .withGeneration(generation)
-                    .withWriterPool()
+                    .withWriterPool(SegmentBufferWriterPool.PoolType.GLOBAL)
                     .with(nodesOnlyCache())
                     .build(store);
 
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
index 8aea965f66..661a037876 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorExternalBlobTest.java
@@ -31,7 +31,7 @@ public class ParallelCompactorExternalBlobTest extends 
AbstractCompactorExternal
     protected ParallelCompactor createCompactor(@NotNull FileStore fileStore, 
@NotNull GCGeneration generation) {
         SegmentWriter writer = defaultSegmentWriterBuilder("c")
                 .withGeneration(generation)
-                .withWriterPool()
+                
.withWriterPool(SegmentBufferWriterPool.PoolType.THREAD_SPECIFIC)
                 .build(fileStore);
 
         return new ParallelCompactor(
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
index 596022bba3..627a3acf4a 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/ParallelCompactorTest.java
@@ -31,7 +31,7 @@ public class ParallelCompactorTest extends 
AbstractCompactorTest {
     protected ParallelCompactor createCompactor(@NotNull FileStore fileStore, 
@NotNull GCGeneration generation) {
         SegmentWriter writer = defaultSegmentWriterBuilder("c")
                 .withGeneration(generation)
-                .withWriterPool()
+                
.withWriterPool(SegmentBufferWriterPool.PoolType.THREAD_SPECIFIC)
                 .build(fileStore);
 
         return new ParallelCompactor(
diff --git 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
index 6431ca7902..c7417e1469 100644
--- 
a/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
+++ 
b/oak-segment-tar/src/test/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPoolTest.java
@@ -29,7 +29,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
-import java.util.concurrent.Callable;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
@@ -40,10 +41,12 @@ import 
org.apache.jackrabbit.oak.segment.WriteOperationHandler.WriteOperation;
 import org.apache.jackrabbit.oak.segment.file.tar.GCGeneration;
 import org.apache.jackrabbit.oak.segment.memory.MemoryStore;
 import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
 import org.junit.After;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 public class SegmentBufferWriterPoolTest {
     private final MemoryStore store = new MemoryStore();
 
@@ -51,17 +54,21 @@ public class SegmentBufferWriterPoolTest {
 
     private GCGeneration gcGeneration = GCGeneration.NULL;
 
-    private final SegmentBufferWriterPool pool = new SegmentBufferWriterPool(
-            store.getSegmentIdProvider(),
-            store.getReader(),
-            "",
-            () -> gcGeneration
-    );
+    private final SegmentBufferWriterPool pool;
 
     private final ExecutorService[] executors = new ExecutorService[] {
         newSingleThreadExecutor(), newSingleThreadExecutor(), 
newSingleThreadExecutor()};
 
-    public SegmentBufferWriterPoolTest() throws IOException { }
+    @Parameterized.Parameters
+    public static List<SegmentBufferWriterPool.PoolType> poolTypes() {
+        return Arrays.asList(SegmentBufferWriterPool.PoolType.values());
+    }
+
+    public SegmentBufferWriterPoolTest(SegmentBufferWriterPool.PoolType 
poolType) throws IOException {
+        pool = SegmentBufferWriterPool.factory(
+                store.getSegmentIdProvider(), store.getReader(), "", () -> 
gcGeneration)
+                .newPool(poolType);
+    }
 
     @After
     public void tearDown() {

Reply via email to