jelmini commented on code in PR #692:
URL: https://github.com/apache/jackrabbit-oak/pull/692#discussion_r1050652047


##########
oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java:
##########
@@ -82,151 +62,292 @@ public class SegmentBufferWriterPool implements 
WriteOperationHandler {
 
     private short writerId = -1;
 
-    public SegmentBufferWriterPool(
+    private SegmentBufferWriterPool(
             @NotNull SegmentIdProvider idProvider,
             @NotNull SegmentReader reader,
             @NotNull String wid,
             @NotNull Supplier<GCGeneration> gcGeneration) {
-        this.idProvider = checkNotNull(idProvider);
-        this.reader = checkNotNull(reader);
-        this.wid = checkNotNull(wid);
-        this.gcGeneration = checkNotNull(gcGeneration);
+        this.idProvider = idProvider;
+        this.reader = reader;
+        this.wid = wid;
+        this.gcGeneration = gcGeneration;
     }
 
-    @Override
-    @NotNull
-    public GCGeneration getGCGeneration() {
-        return gcGeneration.get();
+    public enum PoolType {
+        GLOBAL,
+        THREAD_SPECIFIC;
     }
 
-    @NotNull
-    @Override
-    public RecordId execute(@NotNull GCGeneration gcGeneration,
-                            @NotNull WriteOperation writeOperation)
-    throws IOException {
-        SimpleImmutableEntry<?,?> key = new 
SimpleImmutableEntry<>(currentThread(), gcGeneration);
-        SegmentBufferWriter writer = borrowWriter(key, gcGeneration);
-        try {
-            return writeOperation.execute(writer);
-        } finally {
-            returnWriter(key, writer);
+    public static class SegmentBufferWriterPoolFactory {
+        @NotNull
+        private final SegmentIdProvider idProvider;
+        @NotNull
+        private final SegmentReader reader;
+        @NotNull
+        private final String wid;
+        @NotNull
+        private final Supplier<GCGeneration> gcGeneration;
+
+        private SegmentBufferWriterPoolFactory(
+                @NotNull SegmentIdProvider idProvider,
+                @NotNull SegmentReader reader,
+                @NotNull String wid,
+                @NotNull Supplier<GCGeneration> gcGeneration) {
+            this.idProvider = checkNotNull(idProvider);
+            this.reader = checkNotNull(reader);
+            this.wid = checkNotNull(wid);
+            this.gcGeneration = checkNotNull(gcGeneration);
+        }
+
+        @NotNull
+        public SegmentBufferWriterPool newPool(@NotNull 
SegmentBufferWriterPool.PoolType poolType) {
+            switch (poolType) {
+                case GLOBAL:
+                    return new GlobalSegmentBufferWriterPool(idProvider, 
reader, wid, gcGeneration);
+                case THREAD_SPECIFIC:
+                    return new 
ThreadSpecificSegmentBufferWriterPool(idProvider, reader, wid, gcGeneration);
+                default:
+                    throw new IllegalArgumentException("Unknown writer pool 
type.");
+            }
         }
     }
 
-    @Override
-    public void flush(@NotNull SegmentStore store) throws IOException {
-        List<SegmentBufferWriter> toFlush = newArrayList();
-        List<SegmentBufferWriter> toReturn = newArrayList();
-
-        poolMonitor.enter();
-        try {
-            // Collect all writers that are not currently in use and clear
-            // the list so they won't get re-used anymore.
-            toFlush.addAll(writers.values());
-            writers.clear();
-
-            // Collect all borrowed writers, which we need to wait for.
-            // Clear the list so they will get disposed once returned.
-            toReturn.addAll(borrowed);
-            borrowed.clear();
-        } finally {
-            poolMonitor.leave();
+    public static SegmentBufferWriterPoolFactory factory(
+            @NotNull SegmentIdProvider idProvider,
+            @NotNull SegmentReader reader,
+            @NotNull String wid,
+            @NotNull Supplier<GCGeneration> gcGeneration) {
+        return new SegmentBufferWriterPoolFactory(idProvider, reader, wid, 
gcGeneration);
+    }
+
+    private static class ThreadSpecificSegmentBufferWriterPool extends 
SegmentBufferWriterPool {
+        /**
+         * Read write lock protecting the state of this pool. Multiple threads 
can access their writers in parallel,
+         * acquiring the read lock. The writer lock is needed for the flush 
operation since it requires none
+         * of the writers to be in use.
+         */
+        private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
+
+        /**
+         * Pool of writers. Every thread is assigned a unique writer per GC 
generation, therefore only requiring
+         * a concurrent map to synchronize access to them.
+         */
+        private final ConcurrentMap<Object, SegmentBufferWriter> writers = 
newConcurrentMap();
+
+        public ThreadSpecificSegmentBufferWriterPool(
+                @NotNull SegmentIdProvider idProvider,
+                @NotNull SegmentReader reader,
+                @NotNull String wid,
+                @NotNull Supplier<GCGeneration> gcGeneration) {
+            super(idProvider, reader, wid, gcGeneration);
         }
 
-        // Wait for the return of the borrowed writers. This is the
-        // case once all of them appear in the disposed set.
-        if (safeEnterWhen(poolMonitor, allReturned(toReturn))) {
+        @NotNull
+        @Override
+        public RecordId execute(@NotNull GCGeneration gcGeneration,
+                                @NotNull WriteOperation writeOperation)
+                throws IOException {
+            lock.readLock().lock();
+            SegmentBufferWriter writer = getWriter(currentThread(), 
gcGeneration);
             try {
-                // Collect all disposed writers and clear the list to mark them
-                // as flushed.
-                toFlush.addAll(toReturn);
-                disposed.removeAll(toReturn);
+                return writeOperation.execute(writer);
             } finally {
-                poolMonitor.leave();
+                lock.readLock().unlock();
+            }
+        }
+
+        @Override
+        public void flush(@NotNull SegmentStore store) throws IOException {
+            lock.writeLock().lock();
+            try {
+                for (SegmentBufferWriter writer : writers.values()) {
+                    writer.flush(store);
+                }
+                writers.clear();
+            } finally {
+                lock.writeLock().unlock();
             }
         }
 
-        // Call flush from outside the pool monitor to avoid potential
-        // deadlocks of that method calling SegmentStore.writeSegment
-        for (SegmentBufferWriter writer : toFlush) {
-            writer.flush(store);
+        @NotNull
+        private SegmentBufferWriter getWriter(@NotNull Thread thread, @NotNull 
GCGeneration gcGeneration) {
+            SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(thread, 
gcGeneration);
+            SegmentBufferWriter writer = writers.get(key);

Review Comment:
   This code can be replaced with:
   ```java
   return writers.computeIfAbsent(key, k -> newWriter(gcGeneration));
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@jackrabbit.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to