jelmini commented on code in PR #692: URL: https://github.com/apache/jackrabbit-oak/pull/692#discussion_r1050652047
########## oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/SegmentBufferWriterPool.java: ########## @@ -82,151 +62,292 @@ public class SegmentBufferWriterPool implements WriteOperationHandler { private short writerId = -1; - public SegmentBufferWriterPool( + private SegmentBufferWriterPool( @NotNull SegmentIdProvider idProvider, @NotNull SegmentReader reader, @NotNull String wid, @NotNull Supplier<GCGeneration> gcGeneration) { - this.idProvider = checkNotNull(idProvider); - this.reader = checkNotNull(reader); - this.wid = checkNotNull(wid); - this.gcGeneration = checkNotNull(gcGeneration); + this.idProvider = idProvider; + this.reader = reader; + this.wid = wid; + this.gcGeneration = gcGeneration; } - @Override - @NotNull - public GCGeneration getGCGeneration() { - return gcGeneration.get(); + public enum PoolType { + GLOBAL, + THREAD_SPECIFIC; } - @NotNull - @Override - public RecordId execute(@NotNull GCGeneration gcGeneration, - @NotNull WriteOperation writeOperation) - throws IOException { - SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(currentThread(), gcGeneration); - SegmentBufferWriter writer = borrowWriter(key, gcGeneration); - try { - return writeOperation.execute(writer); - } finally { - returnWriter(key, writer); + public static class SegmentBufferWriterPoolFactory { + @NotNull + private final SegmentIdProvider idProvider; + @NotNull + private final SegmentReader reader; + @NotNull + private final String wid; + @NotNull + private final Supplier<GCGeneration> gcGeneration; + + private SegmentBufferWriterPoolFactory( + @NotNull SegmentIdProvider idProvider, + @NotNull SegmentReader reader, + @NotNull String wid, + @NotNull Supplier<GCGeneration> gcGeneration) { + this.idProvider = checkNotNull(idProvider); + this.reader = checkNotNull(reader); + this.wid = checkNotNull(wid); + this.gcGeneration = checkNotNull(gcGeneration); + } + + @NotNull + public SegmentBufferWriterPool newPool(@NotNull SegmentBufferWriterPool.PoolType poolType) { + switch (poolType) { + case GLOBAL: + return new GlobalSegmentBufferWriterPool(idProvider, reader, wid, gcGeneration); + case THREAD_SPECIFIC: + return new ThreadSpecificSegmentBufferWriterPool(idProvider, reader, wid, gcGeneration); + default: + throw new IllegalArgumentException("Unknown writer pool type."); + } } } - @Override - public void flush(@NotNull SegmentStore store) throws IOException { - List<SegmentBufferWriter> toFlush = newArrayList(); - List<SegmentBufferWriter> toReturn = newArrayList(); - - poolMonitor.enter(); - try { - // Collect all writers that are not currently in use and clear - // the list so they won't get re-used anymore. - toFlush.addAll(writers.values()); - writers.clear(); - - // Collect all borrowed writers, which we need to wait for. - // Clear the list so they will get disposed once returned. - toReturn.addAll(borrowed); - borrowed.clear(); - } finally { - poolMonitor.leave(); + public static SegmentBufferWriterPoolFactory factory( + @NotNull SegmentIdProvider idProvider, + @NotNull SegmentReader reader, + @NotNull String wid, + @NotNull Supplier<GCGeneration> gcGeneration) { + return new SegmentBufferWriterPoolFactory(idProvider, reader, wid, gcGeneration); + } + + private static class ThreadSpecificSegmentBufferWriterPool extends SegmentBufferWriterPool { + /** + * Read write lock protecting the state of this pool. Multiple threads can access their writers in parallel, + * acquiring the read lock. The writer lock is needed for the flush operation since it requires none + * of the writers to be in use. + */ + private final ReadWriteLock lock = new ReentrantReadWriteLock(true); + + /** + * Pool of writers. Every thread is assigned a unique writer per GC generation, therefore only requiring + * a concurrent map to synchronize access to them. + */ + private final ConcurrentMap<Object, SegmentBufferWriter> writers = newConcurrentMap(); + + public ThreadSpecificSegmentBufferWriterPool( + @NotNull SegmentIdProvider idProvider, + @NotNull SegmentReader reader, + @NotNull String wid, + @NotNull Supplier<GCGeneration> gcGeneration) { + super(idProvider, reader, wid, gcGeneration); } - // Wait for the return of the borrowed writers. This is the - // case once all of them appear in the disposed set. - if (safeEnterWhen(poolMonitor, allReturned(toReturn))) { + @NotNull + @Override + public RecordId execute(@NotNull GCGeneration gcGeneration, + @NotNull WriteOperation writeOperation) + throws IOException { + lock.readLock().lock(); + SegmentBufferWriter writer = getWriter(currentThread(), gcGeneration); try { - // Collect all disposed writers and clear the list to mark them - // as flushed. - toFlush.addAll(toReturn); - disposed.removeAll(toReturn); + return writeOperation.execute(writer); } finally { - poolMonitor.leave(); + lock.readLock().unlock(); + } + } + + @Override + public void flush(@NotNull SegmentStore store) throws IOException { + lock.writeLock().lock(); + try { + for (SegmentBufferWriter writer : writers.values()) { + writer.flush(store); + } + writers.clear(); + } finally { + lock.writeLock().unlock(); } } - // Call flush from outside the pool monitor to avoid potential - // deadlocks of that method calling SegmentStore.writeSegment - for (SegmentBufferWriter writer : toFlush) { - writer.flush(store); + @NotNull + private SegmentBufferWriter getWriter(@NotNull Thread thread, @NotNull GCGeneration gcGeneration) { + SimpleImmutableEntry<?,?> key = new SimpleImmutableEntry<>(thread, gcGeneration); + SegmentBufferWriter writer = writers.get(key); Review Comment: This code can be replaced with: ```java return writers.computeIfAbsent(key, k -> newWriter(gcGeneration)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@jackrabbit.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org